diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java index 2fea60ce9d7..c6f7d40f375 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java @@ -17,8 +17,6 @@ package org.apache.dubbo.remoting.api.connection; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.api.WireProtocol; @@ -32,9 +30,6 @@ public abstract class AbstractConnectionClient extends AbstractClient { - private static final ErrorTypeAwareLogger logger = - LoggerFactory.getErrorTypeAwareLogger(AbstractConnectionClient.class); - protected WireProtocol protocol; protected InetSocketAddress remote; diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index c26d9352f49..5b842d33aff 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -18,8 +18,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.Version; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.common.utils.NetUtils; @@ -55,8 +53,6 @@ */ public abstract class AbstractClient extends AbstractEndpoint implements Client { - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class); - private final Lock connectLock = new ReentrantLock(); private final boolean needReconnect; @@ -148,7 +144,7 @@ protected AbstractClient() { private void initExecutor(URL url) { ExecutorRepository executorRepository = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()); - /** + /* * Consumer's executor is shared globally, provider ip doesn't need to be part of the thread name. * * Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance, @@ -415,36 +411,26 @@ public String toString() { /** * Open client. - * - * @throws Throwable */ protected abstract void doOpen() throws Throwable; /** * Close client. - * - * @throws Throwable */ protected abstract void doClose() throws Throwable; /** * Connect to server. - * - * @throws Throwable */ protected abstract void doConnect() throws Throwable; /** * disConnect to server. - * - * @throws Throwable */ protected abstract void doDisConnect() throws Throwable; /** * Get the connected channel. - * - * @return channel */ protected abstract Channel getChannel(); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java index cc199265d69..c5cf86996f7 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java @@ -36,7 +36,7 @@ */ public abstract class AbstractEndpoint extends AbstractPeer implements Resetable { - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractEndpoint.class); + protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass()); private Codec2 codec; diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java index bb50ddc304d..1977d2b52ff 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java @@ -17,8 +17,6 @@ package org.apache.dubbo.remoting.transport; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; import org.apache.dubbo.common.utils.ConcurrentHashSet; import org.apache.dubbo.common.utils.ExecutorUtil; @@ -45,7 +43,7 @@ * AbstractServer */ public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer { - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractServer.class); + private Set executors = new ConcurrentHashSet<>(); private InetSocketAddress localAddress; private InetSocketAddress bindAddress; diff --git a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/Constants.java b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/Constants.java new file mode 100644 index 00000000000..1080305dc4e --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/Constants.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http3.netty4; + +public final class Constants { + + public static final String PIPELINE_CONFIGURATOR_KEY = "http3PipelineConfigurator"; + public static final CharSequence TRI_PING = "tri-ping"; + + private Constants() {} +} diff --git a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java index 1737f2dca83..9ef885ed3ec 100644 --- a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java +++ b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java @@ -17,6 +17,7 @@ package org.apache.dubbo.remoting.http3.netty4; import org.apache.dubbo.common.io.StreamUtils; +import org.apache.dubbo.remoting.http12.HttpStatus; import org.apache.dubbo.remoting.http12.h2.Http2Header; import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame; import org.apache.dubbo.remoting.http12.h2.Http2MetadataFrame; @@ -34,7 +35,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName; import io.netty.incubator.codec.http3.DefaultHttp3DataFrame; +import io.netty.incubator.codec.http3.DefaultHttp3Headers; import io.netty.incubator.codec.http3.DefaultHttp3HeadersFrame; import io.netty.incubator.codec.http3.Http3DataFrame; import io.netty.incubator.codec.http3.Http3Headers; @@ -42,6 +45,8 @@ import io.netty.incubator.codec.http3.Http3RequestStreamInboundHandler; import io.netty.incubator.codec.quic.QuicStreamChannel; +import static org.apache.dubbo.remoting.http3.netty4.Constants.TRI_PING; + @Sharable public class NettyHttp3FrameCodec extends Http3RequestStreamInboundHandler implements ChannelOutboundHandler { @@ -49,7 +54,21 @@ public class NettyHttp3FrameCodec extends Http3RequestStreamInboundHandler imple @Override protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) { - ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new DefaultHttpHeaders(frame.headers()), false)); + Http3Headers headers = frame.headers(); + if (headers.contains(TRI_PING)) { + pingReceived(ctx); + return; + } + + ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new DefaultHttpHeaders(headers), false)); + } + + private void pingReceived(ChannelHandlerContext ctx) { + Http3Headers pongHeader = new DefaultHttp3Headers(false); + pongHeader.set(TRI_PING, "0"); + pongHeader.set(PseudoHeaderName.STATUS.value(), HttpStatus.OK.getStatusString()); + ctx.write(new DefaultHttp3HeadersFrame(pongHeader)); + ctx.close(); } @Override diff --git a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Helper.java b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Http3Helper.java similarity index 99% rename from dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Helper.java rename to dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Http3Helper.java index 654831fa14a..4ae742fd56e 100644 --- a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Helper.java +++ b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Http3Helper.java @@ -25,7 +25,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; -final class Helper { +final class Http3Helper { @SuppressWarnings("unchecked") static > T configCodec(QuicCodecBuilder builder, URL url) { diff --git a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3ConnectionClient.java b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3ConnectionClient.java index fc4388e75d8..a3ad10d8d85 100644 --- a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3ConnectionClient.java +++ b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3ConnectionClient.java @@ -21,18 +21,19 @@ import org.apache.dubbo.remoting.Constants; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.http3.Http3SslContexts; -import org.apache.dubbo.remoting.utils.UrlUtils; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.handler.timeout.IdleStateHandler; import io.netty.incubator.codec.http3.Http3; import io.netty.incubator.codec.http3.Http3ClientConnectionHandler; import io.netty.incubator.codec.quic.QuicChannel; @@ -40,12 +41,12 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY; public final class NettyHttp3ConnectionClient extends AbstractNettyConnectionClient { + private Consumer pipelineConfigurator; private AtomicReference datagramChannel; - private QuicChannelBootstrap bootstrap; public NettyHttp3ConnectionClient(URL url, ChannelHandler handler) throws RemotingException { @@ -53,16 +54,17 @@ public NettyHttp3ConnectionClient(URL url, ChannelHandler handler) throws Remoti } @Override + @SuppressWarnings("unchecked") protected void initConnectionClient() { super.initConnectionClient(); datagramChannel = new AtomicReference<>(); + pipelineConfigurator = (Consumer) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY); + Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator should be set"); } @Override protected void initBootstrap() throws Exception { - int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); - io.netty.channel.ChannelHandler codec = Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl()) - .maxIdleTimeout(idleTimeout, MILLISECONDS) + io.netty.channel.ChannelHandler codec = Http3Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl()) .sslContext(Http3SslContexts.buildClientSslContext(getUrl())) .build(); io.netty.channel.Channel nettyDatagramChannel = new Bootstrap() @@ -81,17 +83,15 @@ protected void initChannel(NioDatagramChannel ch) { datagramChannel.set(nettyDatagramChannel); nettyDatagramChannel.closeFuture().addListener(channelFuture -> datagramChannel.set(null)); - int heartbeat = UrlUtils.getHeartbeat(getUrl()); NettyConnectionHandler connectionHandler = new NettyConnectionHandler(this); bootstrap = QuicChannel.newBootstrap(nettyDatagramChannel) .handler(new ChannelInitializer() { @Override protected void initChannel(QuicChannel ch) { - ch.pipeline() - .addLast(new IdleStateHandler(heartbeat, 0, 0, MILLISECONDS)) - .addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler) - .addLast(new Http3ClientConnectionHandler()); - + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new Http3ClientConnectionHandler()); + pipeline.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler); + pipelineConfigurator.accept(pipeline); ch.closeFuture().addListener(channelFuture -> clearNettyChannel()); } }) diff --git a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3Server.java b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3Server.java index cd45f5c286a..e9a8634f25f 100644 --- a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3Server.java +++ b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3Server.java @@ -18,104 +18,78 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.ConfigurationUtils; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; -import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler; import org.apache.dubbo.remoting.http3.Http3SslContexts; -import org.apache.dubbo.remoting.http3.netty4.NettyHttp3FrameCodec; -import org.apache.dubbo.remoting.http3.netty4.NettyHttp3ProtocolSelectorHandler; import org.apache.dubbo.remoting.transport.AbstractServer; import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers; -import org.apache.dubbo.remoting.utils.UrlUtils; -import org.apache.dubbo.rpc.model.FrameworkModel; -import org.apache.dubbo.rpc.model.ScopeModelUtil; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.handler.timeout.IdleStateHandler; import io.netty.incubator.codec.http3.Http3; -import io.netty.incubator.codec.http3.Http3ServerConnectionHandler; import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; import io.netty.incubator.codec.quic.QuicChannel; -import io.netty.incubator.codec.quic.QuicStreamChannel; import io.netty.util.concurrent.Future; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE; import static org.apache.dubbo.remoting.Constants.EVENT_LOOP_BOSS_POOL_NAME; +import static org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY; public class NettyHttp3Server extends AbstractServer { - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyHttp3Server.class); - private Map channels; private Bootstrap bootstrap; private EventLoopGroup bossGroup; private io.netty.channel.Channel channel; + private final Consumer pipelineConfigurator; private final int serverShutdownTimeoutMills; + @SuppressWarnings("unchecked") public NettyHttp3Server(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, url)); + pipelineConfigurator = (Consumer) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY); + Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator should be set"); serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel()); } @Override protected void doOpen() throws Throwable { bootstrap = new Bootstrap(); - bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME); - NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); - - FrameworkModel frameworkModel = ScopeModelUtil.getFrameworkModel(getUrl().getScopeModel()); - NettyHttp3ProtocolSelectorHandler selectorHandler = - new NettyHttp3ProtocolSelectorHandler(getUrl(), frameworkModel); - - int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); - io.netty.channel.ChannelHandler codec = Helper.configCodec(Http3.newQuicServerCodecBuilder(), getUrl()) - .sslContext(Http3SslContexts.buildServerSslContext(getUrl())) - .maxIdleTimeout(idleTimeout, MILLISECONDS) - .tokenHandler(InsecureQuicTokenHandler.INSTANCE) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(QuicChannel ch) { - ch.pipeline() - .addLast(nettyServerHandler) - .addLast(new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) - .addLast(new Http3ServerConnectionHandler(new ChannelInitializer() { - @Override - protected void initChannel(QuicStreamChannel ch) { - ch.pipeline() - .addLast(NettyHttp3FrameCodec.INSTANCE) - .addLast(new HttpWriteQueueHandler()) - .addLast(selectorHandler); - } - })); - } - }) - .build(); - - // bind try { ChannelFuture channelFuture = bootstrap .group(bossGroup) .channel(NioDatagramChannel.class) - .handler(codec) + .handler(Http3Helper.configCodec(Http3.newQuicServerCodecBuilder(), getUrl()) + .sslContext(Http3SslContexts.buildServerSslContext(getUrl())) + .tokenHandler(InsecureQuicTokenHandler.INSTANCE) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(QuicChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipelineConfigurator.accept(pipeline); + pipeline.addLast(nettyServerHandler); + } + }) + .build()) .bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); diff --git a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java index 6ed419f3a35..87cc9dd4459 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java @@ -18,8 +18,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.Version; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.ChannelHandler; @@ -48,8 +46,6 @@ */ public class NettyClient extends AbstractClient { - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyClient.class); - // ChannelFactory's closure has a DirectMemory leak, using static to avoid // https://issues.jboss.org/browse/NETTY-424 private static final ChannelFactory CHANNEL_FACTORY = new NioClientSocketChannelFactory( diff --git a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyHandler.java b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyHandler.java index d864639dacc..e9e62b7bb99 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyHandler.java @@ -78,7 +78,7 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } - if (logger.isInfoEnabled()) { + if (logger.isInfoEnabled() && channel != null) { logger.info("The connection between " + channel.getRemoteAddress() + " and " + channel.getLocalAddress() + " is established"); } diff --git a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyPortUnificationServer.java b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyPortUnificationServer.java index 822b9e1fb1a..dedf9cb8567 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyPortUnificationServer.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyPortUnificationServer.java @@ -17,8 +17,6 @@ package org.apache.dubbo.remoting.transport.netty; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.NetUtils; @@ -58,9 +56,6 @@ */ public class NettyPortUnificationServer extends AbstractPortUnificationServer { - private static final ErrorTypeAwareLogger logger = - LoggerFactory.getErrorTypeAwareLogger(NettyPortUnificationServer.class); - private Map dubboChannels = new ConcurrentHashMap<>(); // private ServerBootstrap bootstrap; diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java index a9f983f56d6..f26ca4d8196 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java @@ -18,8 +18,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.Version; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; @@ -45,9 +43,6 @@ public abstract class AbstractNettyConnectionClient extends AbstractConnectionClient { - private static final ErrorTypeAwareLogger LOGGER = - LoggerFactory.getErrorTypeAwareLogger(AbstractNettyConnectionClient.class); - private AtomicReference> connectingPromiseRef; private AtomicReference channelRef; @@ -94,8 +89,8 @@ protected void initConnectionClient() { protected void doClose() { // AbstractPeer close can set closed true. if (isClosed()) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("Connection:%s freed ", this)); + if (logger.isDebugEnabled()) { + logger.debug("Connection:{} freed", this); } performClose(); closePromise.setSuccess(null); @@ -117,9 +112,14 @@ protected void doConnect() throws RemotingException { } if (isClosed()) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("%s aborted to reconnect cause connection closed. ", this)); + if (logger.isDebugEnabled()) { + logger.debug("Connection:{} aborted to reconnect cause connection closed", this); } + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Connection:{} attempting to reconnect to server {}", this, getConnectAddress()); } init.compareAndSet(false, true); @@ -145,11 +145,11 @@ protected void doConnect() throws RemotingException { + ", error message is:" + cause.getMessage(), cause); - LOGGER.error( + logger.error( TRANSPORT_FAILED_CONNECT_PROVIDER, "network disconnected", "", - "Failed to connect to provider server by other reason.", + "Failed to connect to provider server by other reason", cause); throw remotingException; @@ -163,8 +163,8 @@ protected void doConnect() throws RemotingException { + " using dubbo version " + Version.getVersion()); - LOGGER.error( - TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "", "Client-side timeout.", remotingException); + logger.error( + TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "", "Client-side timeout", remotingException); throw remotingException; } @@ -177,6 +177,17 @@ protected void doDisConnect() { NettyChannel.removeChannelIfDisconnected(getNettyChannel()); } + protected void doReconnect() { + connectivityExecutor.execute(() -> { + try { + doConnect(); + } catch (RemotingException e) { + logger.error( + TRANSPORT_FAILED_RECONNECT, "", "", "Failed to reconnect to server: " + getConnectAddress()); + } + }); + } + @Override public void onConnected(Object channel) { if (!(channel instanceof io.netty.channel.Channel)) { @@ -186,8 +197,8 @@ public void onConnected(Object channel) { io.netty.channel.Channel nettyChannel = ((io.netty.channel.Channel) channel); if (isClosed()) { nettyChannel.close(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("%s is closed, ignoring connected event", this)); + if (logger.isDebugEnabled()) { + logger.debug("Connection:{} is closed, ignoring connected event", this); } return; } @@ -211,8 +222,8 @@ public void onConnected(Object channel) { // Notify the connection is available. connectedPromise.trySuccess(null); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("%s connected ", this)); + if (logger.isDebugEnabled()) { + logger.debug("Connection:{} connected", this); } } @@ -229,8 +240,8 @@ public void onGoaway(Object channel) { nettyChannel.close(); } NettyChannel.removeChannelIfDisconnected(nettyChannel); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("%s goaway", this)); + if (logger.isDebugEnabled()) { + logger.debug("Connection:{} goaway", this); } } } @@ -272,7 +283,7 @@ public boolean isAvailable() { try { doConnect(); } catch (RemotingException e) { - LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed to connect to server: " + getConnectAddress()); + logger.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed to connect to server: " + getConnectAddress()); } } @@ -349,36 +360,25 @@ public void operationComplete(ChannelFuture future) { } AbstractNettyConnectionClient connectionClient = AbstractNettyConnectionClient.this; if (connectionClient.isClosed() || connectionClient.getCounter() == 0) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format( - "%s aborted to reconnect. %s", - connectionClient, future.cause().getMessage())); + if (logger.isDebugEnabled()) { + logger.debug( + "Connection:{} aborted to reconnect. {}", + connectionClient, + future.cause().getMessage()); } return; } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format( - "%s is reconnecting, attempt=%d cause=%s", - connectionClient, 0, future.cause().getMessage())); + if (logger.isDebugEnabled()) { + logger.debug( + "Connection:{} is reconnecting, attempt=0 cause={}", + connectionClient, + future.cause().getMessage()); } // Notify the connection is unavailable. disconnectedPromise.trySuccess(null); - connectivityExecutor.schedule( - () -> { - try { - connectionClient.doConnect(); - } catch (RemotingException e) { - LOGGER.error( - TRANSPORT_FAILED_RECONNECT, - "", - "", - "Failed to connect to server: " + getConnectAddress()); - } - }, - reconnectDuration, - TimeUnit.MILLISECONDS); + doReconnect(); } } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java index 85ffb29aa39..6746e339d31 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java @@ -16,7 +16,6 @@ */ package org.apache.dubbo.remoting.transport.netty4; -import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.rpc.model.FrameworkModel; import java.net.InetSocketAddress; @@ -25,11 +24,17 @@ import io.netty.channel.Channel; +import static org.apache.dubbo.common.utils.NetUtils.toAddressString; + public final class AddressUtils { private static final List ACCESSORS = FrameworkModel.defaultModel().getActivateExtensions(ChannelAddressAccessor.class); + private static final String LOCAL_ADDRESS_KEY = "NETTY_LOCAL_ADDRESS_KEY"; + private static final String REMOTE_ADDRESS_KEY = "NETTY_REMOTE_ADDRESS_KEY"; + private static final String PROTOCOL_KEY = "NETTY_PROTOCOL_KEY"; + private AddressUtils() {} public static InetSocketAddress getRemoteAddress(Channel channel) { @@ -54,35 +59,50 @@ public static InetSocketAddress getLocalAddress(Channel channel) { return (InetSocketAddress) channel.localAddress(); } - public static String getRemoteAddressKey(Channel channel) { - InetSocketAddress address; + static void initAddressIfNecessary(NettyChannel nettyChannel) { + Channel channel = nettyChannel.getNioChannel(); + SocketAddress address = channel.localAddress(); + if (address instanceof InetSocketAddress) { + return; + } + for (int i = 0, size = ACCESSORS.size(); i < size; i++) { ChannelAddressAccessor accessor = ACCESSORS.get(i); - address = accessor.getRemoteAddress(channel); - if (address != null) { - return accessor.getProtocol() + ' ' + NetUtils.toAddressString(address); + InetSocketAddress localAddress = accessor.getLocalAddress(channel); + if (localAddress != null) { + nettyChannel.setAttribute(LOCAL_ADDRESS_KEY, localAddress); + nettyChannel.setAttribute(REMOTE_ADDRESS_KEY, accessor.getRemoteAddress(channel)); + nettyChannel.setAttribute(PROTOCOL_KEY, accessor.getProtocol()); + break; } } - InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); - if (remoteAddress == null) { + } + + static InetSocketAddress getLocalAddress(NettyChannel channel) { + InetSocketAddress address = (InetSocketAddress) channel.getAttribute(LOCAL_ADDRESS_KEY); + return address == null ? (InetSocketAddress) (channel.getNioChannel().localAddress()) : address; + } + + static InetSocketAddress getRemoteAddress(NettyChannel channel) { + InetSocketAddress address = (InetSocketAddress) channel.getAttribute(REMOTE_ADDRESS_KEY); + return address == null ? (InetSocketAddress) (channel.getNioChannel().remoteAddress()) : address; + } + + static String getLocalAddressKey(NettyChannel channel) { + InetSocketAddress address = getLocalAddress(channel); + if (address == null) { return "UNKNOWN"; } - return NetUtils.toAddressString(remoteAddress); + String protocol = (String) channel.getAttribute(PROTOCOL_KEY); + return protocol == null ? toAddressString(address) : protocol + ' ' + toAddressString(address); } - public static String getLocalAddressKey(Channel channel) { - InetSocketAddress address; - for (int i = 0, size = ACCESSORS.size(); i < size; i++) { - ChannelAddressAccessor accessor = ACCESSORS.get(i); - address = accessor.getLocalAddress(channel); - if (address != null) { - return accessor.getProtocol() + ' ' + NetUtils.toAddressString(address); - } - } - SocketAddress localAddress = channel.localAddress(); - if (localAddress == null) { + static String getRemoteAddressKey(NettyChannel channel) { + InetSocketAddress address = getRemoteAddress(channel); + if (address == null) { return "UNKNOWN"; } - return NetUtils.toAddressString((InetSocketAddress) localAddress); + String protocol = (String) channel.getAttribute(PROTOCOL_KEY); + return protocol == null ? toAddressString(address) : protocol + ' ' + toAddressString(address); } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java index 8191cdfd483..715438a7389 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java @@ -35,6 +35,7 @@ import java.net.InetSocketAddress; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,16 +74,16 @@ final class NettyChannel extends AbstractChannel { private final Netty4BatchWriteQueue writeQueue; - private Codec2 codec; - private final boolean encodeInIOThread; + private Codec2 codec; + /** * The constructor of NettyChannel. * It is private so NettyChannel usually create by {@link NettyChannel#getOrAddChannel(Channel, URL, ChannelHandler)} * * @param channel netty channel - * @param url + * @param url dubbo url * @param handler dubbo handler that contain netty handler */ private NettyChannel(Channel channel, URL url, ChannelHandler handler) { @@ -94,6 +95,7 @@ private NettyChannel(Channel channel, URL url, ChannelHandler handler) { this.writeQueue = Netty4BatchWriteQueue.createWriteQueue(channel); this.codec = getChannelCodec(url); this.encodeInIOThread = getUrl().getParameter(ENCODE_IN_IO_THREAD_KEY, DEFAULT_ENCODE_IN_IO_THREAD); + AddressUtils.initAddressIfNecessary(this); } /** @@ -101,9 +103,8 @@ private NettyChannel(Channel channel, URL url, ChannelHandler handler) { * Put netty channel into it if dubbo channel don't exist in the cache. * * @param ch netty channel - * @param url + * @param url dubbo url * @param handler dubbo handler that contain netty's handler - * @return */ static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) { if (ch == null) { @@ -150,12 +151,20 @@ static void removeChannel(Channel ch) { @Override public InetSocketAddress getLocalAddress() { - return AddressUtils.getLocalAddress(channel); + return AddressUtils.getLocalAddress(this); } @Override public InetSocketAddress getRemoteAddress() { - return AddressUtils.getRemoteAddress(channel); + return AddressUtils.getRemoteAddress(this); + } + + public String getLocalAddressKey() { + return AddressUtils.getLocalAddressKey(this); + } + + public String getRemoteAddressKey() { + return AddressUtils.getRemoteAddressKey(this); } @Override @@ -172,7 +181,7 @@ public void markActive(boolean isActive) { } /** - * Send message by netty and whether to wait the completion of the send. + * Send message by netty and whether to wait the completion of the sending. * * @param message message that need send. * @param sent whether to ack async-sent @@ -193,23 +202,20 @@ public void send(Object message, boolean sent) throws RemotingException { codec.encode(this, buffer, message); outputMessage = buf; } - ChannelFuture future = writeQueue.enqueue(outputMessage).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!(message instanceof Request)) { + ChannelFuture future = writeQueue.enqueue(outputMessage).addListener((ChannelFutureListener) f -> { + if (!(message instanceof Request)) { + return; + } + ChannelHandler handler = getChannelHandler(); + if (f.isSuccess()) { + handler.sent(NettyChannel.this, message); + } else { + Throwable t = f.cause(); + if (t == null) { return; } - ChannelHandler handler = getChannelHandler(); - if (future.isSuccess()) { - handler.sent(NettyChannel.this, message); - } else { - Throwable t = future.cause(); - if (t == null) { - return; - } - Response response = buildErrorResponse((Request) message, t); - handler.received(NettyChannel.this, response); - } + Response response = buildErrorResponse((Request) message, t); + handler.received(NettyChannel.this, response); } }); @@ -318,18 +324,7 @@ public boolean equals(Object obj) { return channel.equals(client.getNettyChannel()); } - if (getClass() != obj.getClass()) { - return false; - } - NettyChannel other = (NettyChannel) obj; - if (channel == null) { - if (other.channel != null) { - return false; - } - } else if (!channel.equals(other.channel)) { - return false; - } - return true; + return getClass() == obj.getClass() && Objects.equals(channel, ((NettyChannel) obj).channel); } @Override @@ -359,6 +354,7 @@ private static Response buildErrorResponse(Request request, Throwable t) { return response; } + @SuppressWarnings("deprecation") private static Codec2 getChannelCodec(URL url) { String codecName = url.getParameter(Constants.CODEC_KEY); if (StringUtils.isEmpty(codecName)) { diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelHandler.java index 1c31861c5e3..99ce684b150 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelHandler.java @@ -56,8 +56,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info( "The connection {} of {} -> {} is established.", ch, - AddressUtils.getRemoteAddressKey(ch), - AddressUtils.getLocalAddressKey(ch)); + channel.getRemoteAddressKey(), + channel.getLocalAddressKey()); } } } @@ -71,12 +71,13 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { dubboChannels.remove(NetUtils.toAddressString((InetSocketAddress) ch.remoteAddress())); if (channel != null) { handler.disconnected(channel); + if (logger.isInfoEnabled()) { logger.info( "The connection {} of {} -> {} is disconnected.", ch, - AddressUtils.getRemoteAddressKey(ch), - AddressUtils.getLocalAddressKey(ch)); + channel.getRemoteAddressKey(), + channel.getLocalAddressKey()); } } } finally { diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java index d01a4810492..bd4d89f5b70 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java @@ -20,8 +20,6 @@ import org.apache.dubbo.common.Version; import org.apache.dubbo.common.config.ConfigurationUtils; import org.apache.dubbo.common.constants.CommonConstants; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.resource.GlobalResourceInitializer; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.common.utils.StringUtils; @@ -67,8 +65,6 @@ public class NettyClient extends AbstractClient { private static final String DEFAULT_SOCKS_PROXY_PORT = "1080"; - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyClient.class); - /** * netty client bootstrap */ diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java index ed77458cf00..449b39385ae 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java @@ -58,12 +58,13 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); handler.connected(channel); + if (logger.isInfoEnabled()) { logger.info( "The connection {} of {} -> {} is established.", ch, - AddressUtils.getLocalAddressKey(ch), - AddressUtils.getRemoteAddressKey(ch)); + channel.getLocalAddressKey(), + channel.getRemoteAddressKey()); } } @@ -81,8 +82,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info( "The connection {} of {} -> {} is disconnected.", ch, - AddressUtils.getLocalAddressKey(ch), - AddressUtils.getRemoteAddressKey(ch)); + channel.getLocalAddressKey(), + channel.getRemoteAddressKey()); } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java index 68bffdc9529..9c4d08c0cee 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java @@ -30,7 +30,6 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; -import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RECONNECT; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_UNEXPECTED_EXCEPTION; @Sharable @@ -62,7 +61,7 @@ public void onGoAway(Object channel) { connectionClient.onGoaway(nettyChannel); } if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("Channel %s go away ,schedule reconnect", nettyChannel)); + LOGGER.debug("Channel {} go away ,schedule reconnect", nettyChannel); } reconnect(nettyChannel); } @@ -74,43 +73,30 @@ public void reconnect(Object channel) { } Channel nettyChannel = ((Channel) channel); if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("Connection %s is reconnecting, attempt=%d", connectionClient, 1)); + LOGGER.debug("Connection:{} is reconnecting, attempt={}", connectionClient, 1); } EventLoop eventLoop = nettyChannel.eventLoop(); if (connectionClient.isClosed()) { - LOGGER.info("The client has been closed and will not reconnect. "); + LOGGER.info("The connection {} has been closed and will not reconnect", connectionClient); return; } - eventLoop.schedule( - () -> { - try { - connectionClient.doConnect(); - } catch (Throwable e) { - LOGGER.error( - TRANSPORT_FAILED_RECONNECT, - "", - "", - "Fail to connect to " + connectionClient.getChannel(), - e); - } - }, - 1, - TimeUnit.SECONDS); + eventLoop.schedule(connectionClient::doReconnect, 1, TimeUnit.SECONDS); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); Channel ch = ctx.channel(); - NettyChannel.getOrAddChannel(ch, connectionClient.getUrl(), connectionClient); + NettyChannel channel = NettyChannel.getOrAddChannel(ch, connectionClient.getUrl(), connectionClient); if (!connectionClient.isClosed()) { connectionClient.onConnected(ch); - if (LOGGER.isInfoEnabled()) { + + if (LOGGER.isInfoEnabled() && channel != null) { LOGGER.info( "The connection {} of {} -> {} is established.", ch, - AddressUtils.getLocalAddressKey(ch), - AddressUtils.getRemoteAddressKey(ch)); + channel.getLocalAddressKey(), + channel.getRemoteAddressKey()); } } else { ctx.close(); @@ -121,16 +107,20 @@ public void channelActive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); Channel ch = ctx.channel(); + NettyChannel channel = NettyChannel.getOrAddChannel(ch, connectionClient.getUrl(), connectionClient); try { Attribute goawayAttr = ch.attr(GO_AWAY_KEY); if (!Boolean.TRUE.equals(goawayAttr.get())) { reconnect(ch); } - LOGGER.info( - "The connection {} of {} -> {} is disconnected.", - ch, - AddressUtils.getLocalAddressKey(ch), - AddressUtils.getRemoteAddressKey(ch)); + + if (LOGGER.isInfoEnabled() && channel != null) { + LOGGER.info( + "The connection {} of {} -> {} is disconnected.", + ch, + channel.getLocalAddressKey(), + channel.getRemoteAddressKey()); + } } finally { NettyChannel.removeChannel(ch); } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java index 4af9649fc79..5786f3a9db9 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java @@ -18,8 +18,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.ConfigurationUtils; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; @@ -59,9 +57,6 @@ */ public class NettyPortUnificationServer extends AbstractPortUnificationServer { - private static final ErrorTypeAwareLogger logger = - LoggerFactory.getErrorTypeAwareLogger(NettyPortUnificationServer.class); - private final int serverShutdownTimeoutMills; /** * netty server bootstrap. diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java index 6de1969cf19..504931c90f2 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java @@ -18,8 +18,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.ConfigurationUtils; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.ClassUtils; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NetUtils; @@ -64,7 +62,6 @@ */ public class NettyServer extends AbstractServer { - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyServer.class); /** * the cache for alive worker channel. * diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java index c9987dcdb15..c0eac5df970 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java @@ -75,8 +75,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info( "The connection {} of {} -> {} is established.", ch, - AddressUtils.getRemoteAddressKey(ch), - AddressUtils.getLocalAddressKey(ch)); + channel.getLocalAddressKey(), + channel.getRemoteAddressKey()); } } @@ -95,8 +95,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info( "The connection {} of {} -> {} is disconnected.", ch, - AddressUtils.getRemoteAddressKey(ch), - AddressUtils.getLocalAddressKey(ch)); + channel.getRemoteAddressKey(), + channel.getLocalAddressKey()); } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java index 247fe70dc82..3394a81295a 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java @@ -19,25 +19,42 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.Configuration; import org.apache.dubbo.common.constants.LoggerCodeConstants; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.logger.FluentLogger; import org.apache.dubbo.common.utils.ClassUtils; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.RemotingServer; import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient; +import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler; +import org.apache.dubbo.remoting.http3.netty4.NettyHttp3FrameCodec; +import org.apache.dubbo.remoting.http3.netty4.NettyHttp3ProtocolSelectorHandler; import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter; import org.apache.dubbo.remoting.transport.netty4.NettyHttp3Server; +import org.apache.dubbo.remoting.utils.UrlUtils; import org.apache.dubbo.rpc.Constants; +import org.apache.dubbo.rpc.model.ScopeModelUtil; +import org.apache.dubbo.rpc.protocol.tri.h3.Http3ClientFrameCodec; +import org.apache.dubbo.rpc.protocol.tri.h3.Http3TripleServerConnectionHandler; import org.apache.dubbo.rpc.protocol.tri.h3.negotiation.Helper; import java.util.ArrayList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.flush.FlushConsolidationHandler; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.incubator.codec.http3.Http3ServerConnectionHandler; +import io.netty.incubator.codec.quic.QuicStreamChannel; + +import static org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY; public final class Http3Exchanger { - private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(Http3Exchanger.class); + private static final FluentLogger LOGGER = FluentLogger.of(Http3Exchanger.class); private static final boolean HAS_NETTY_HTTP3 = ClassUtils.isPresent("io.netty.incubator.codec.http3.Http3"); private static final Map SERVERS = new ConcurrentHashMap<>(); private static final Map CLIENTS = new ConcurrentHashMap<>(16); @@ -65,7 +82,8 @@ public static RemotingServer bind(URL url) { if (isEnabled(url)) { return SERVERS.computeIfAbsent(url.getAddress(), addr -> { try { - return new NettyHttp3Server(url, HANDLER); + URL serverUrl = url.putAttribute(PIPELINE_CONFIGURATOR_KEY, configServerPipeline(url)); + return new NettyHttp3Server(serverUrl, HANDLER); } catch (RemotingException e) { throw new RuntimeException(e); } @@ -74,12 +92,31 @@ public static RemotingServer bind(URL url) { return null; } + private static Consumer configServerPipeline(URL url) { + NettyHttp3ProtocolSelectorHandler selectorHandler = + new NettyHttp3ProtocolSelectorHandler(url, ScopeModelUtil.getFrameworkModel(url.getScopeModel())); + return pipeline -> { + pipeline.addLast(new Http3ServerConnectionHandler(new ChannelInitializer() { + @Override + protected void initChannel(QuicStreamChannel ch) { + ch.pipeline() + .addLast(new HttpWriteQueueHandler()) + .addLast(new FlushConsolidationHandler(64, true)) + .addLast(NettyHttp3FrameCodec.INSTANCE) + .addLast(selectorHandler); + } + })); + pipeline.addLast(new Http3TripleServerConnectionHandler()); + }; + } + public static AbstractConnectionClient connect(URL url) { return CLIENTS.compute(url.getAddress(), (address, client) -> { if (client == null) { + URL clientUrl = url.putAttribute(PIPELINE_CONFIGURATOR_KEY, configClientPipeline(url)); AbstractConnectionClient connectionClient = NEGOTIATION_ENABLED - ? Helper.createAutoSwitchClient(url, HANDLER) - : Helper.createHttp3Client(url, HANDLER); + ? Helper.createAutoSwitchClient(clientUrl, HANDLER) + : Helper.createHttp3Client(clientUrl, HANDLER); connectionClient.addCloseListener(() -> CLIENTS.remove(address, connectionClient)); client = connectionClient; } else { @@ -89,6 +126,16 @@ public static AbstractConnectionClient connect(URL url) { }); } + private static Consumer configClientPipeline(URL url) { + int heartbeat = UrlUtils.getHeartbeat(url); + int closeTimeout = UrlUtils.getCloseTimeout(url); + return pipeline -> { + pipeline.addLast(Http3ClientFrameCodec.INSTANCE); + pipeline.addLast(new IdleStateHandler(heartbeat, 0, 0, TimeUnit.MILLISECONDS)); + pipeline.addLast(new TriplePingPongHandler(closeTimeout)); + }; + } + public static void close() { if (SERVERS.isEmpty()) { return; @@ -99,7 +146,7 @@ public static void close() { try { server.close(); } catch (Throwable t) { - LOGGER.error(LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_SERVER, "", "", "Close Http3 server failed", t); + LOGGER.error(LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_SERVER, "Close http3 server failed", t); } } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java index 11ef9da285e..da4e9cd9e99 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java @@ -16,6 +16,10 @@ */ package org.apache.dubbo.rpc.protocol.tri.h3; +import org.apache.dubbo.common.logger.FluentLogger; +import org.apache.dubbo.remoting.http12.HttpConstants; +import org.apache.dubbo.remoting.http12.HttpMethods; +import org.apache.dubbo.remoting.http3.netty4.Constants; import org.apache.dubbo.remoting.http3.netty4.Http2HeadersAdapter; import org.apache.dubbo.remoting.http3.netty4.Http3HeadersAdapter; import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum; @@ -24,33 +28,49 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DefaultHttp2DataFrame; import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame; import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.DefaultHttp2PingFrame; import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName; import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2PingFrame; import io.netty.incubator.codec.http3.DefaultHttp3DataFrame; +import io.netty.incubator.codec.http3.DefaultHttp3Headers; import io.netty.incubator.codec.http3.DefaultHttp3HeadersFrame; +import io.netty.incubator.codec.http3.Http3; import io.netty.incubator.codec.http3.Http3DataFrame; import io.netty.incubator.codec.http3.Http3ErrorCode; import io.netty.incubator.codec.http3.Http3Exception; import io.netty.incubator.codec.http3.Http3GoAwayFrame; +import io.netty.incubator.codec.http3.Http3Headers; import io.netty.incubator.codec.http3.Http3HeadersFrame; +import io.netty.incubator.codec.http3.Http3RequestStreamInitializer; +import io.netty.incubator.codec.quic.QuicChannel; import io.netty.incubator.codec.quic.QuicStreamChannel; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RECONNECT; + @Sharable public class Http3ClientFrameCodec extends ChannelDuplexHandler { + private static final FluentLogger LOGGER = FluentLogger.of(Http3ClientFrameCodec.class); public static final Http3ClientFrameCodec INSTANCE = new Http3ClientFrameCodec(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof Http3HeadersFrame) { - Http2HeadersAdapter headers = new Http2HeadersAdapter(((Http3HeadersFrame) msg).headers()); - boolean endStream = headers.contains(TripleHeaderEnum.STATUS_KEY.getKey()); - ctx.fireChannelRead(new DefaultHttp2HeadersFrame(headers, endStream)); + Http3Headers headers = ((Http3HeadersFrame) msg).headers(); + if (headers.contains(Constants.TRI_PING)) { + pingAck(ctx); + } else { + boolean endStream = headers.contains(TripleHeaderEnum.STATUS_KEY.getKey()); + ctx.fireChannelRead(new DefaultHttp2HeadersFrame(new Http2HeadersAdapter(headers), endStream)); + } } else if (msg instanceof Http3DataFrame) { ctx.fireChannelRead(new DefaultHttp2DataFrame(((Http3DataFrame) msg).content())); } else if (msg instanceof Http3GoAwayFrame) { @@ -60,9 +80,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } } + private void pingAck(ChannelHandlerContext ctx) { + ChannelPipeline pipeline = ctx.channel().parent().pipeline(); + pipeline.fireChannelRead(new DefaultHttp2PingFrame(0, true)); + pipeline.fireChannelReadComplete(); + } + @Override public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.fireChannelRead(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER, true)); + if (ctx instanceof QuicStreamChannel) { + ctx.fireChannelRead(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER, true)); + } else { + ctx.fireChannelReadComplete(); + } } @Override @@ -80,11 +110,38 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) return; } ctx.write(new DefaultHttp3DataFrame(frame.content()), promise); + } else if (msg instanceof Http2PingFrame) { + sendPing((QuicChannel) ctx.channel()); } else { ctx.write(msg, promise); } } + private void sendPing(QuicChannel channel) { + Http3.newRequestStream(channel, new Http3RequestStreamInitializer() { + @Override + protected void initRequestStream(QuicStreamChannel ch) { + ch.pipeline().addLast(INSTANCE); + } + }) + .addListener(future -> { + if (future.isSuccess()) { + QuicStreamChannel streamChannel = (QuicStreamChannel) future.getNow(); + + Http3Headers header = new DefaultHttp3Headers(false); + header.set(PseudoHeaderName.METHOD.value(), HttpMethods.OPTIONS.name()); + header.set(PseudoHeaderName.PATH.value(), "*"); + header.set(PseudoHeaderName.SCHEME.value(), HttpConstants.HTTPS); + header.set(Constants.TRI_PING, "0"); + + streamChannel.write(new DefaultHttp3HeadersFrame(header)); + streamChannel.shutdownOutput(); + } else { + LOGGER.warn(TRANSPORT_FAILED_RECONNECT, "Failed to send ping frame", future.cause()); + } + }); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof Http3Exception) { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleServerConnectionHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleServerConnectionHandler.java new file mode 100644 index 00000000000..afcd19e48c3 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleServerConnectionHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.h3; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.incubator.codec.http3.Http3GoAwayFrame; +import io.netty.util.ReferenceCountUtil; + +public class Http3TripleServerConnectionHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http3GoAwayFrame) { + ReferenceCountUtil.release(msg); + return; + } + super.channelRead(ctx, msg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + super.close(ctx, promise); + } +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java index baffe375a12..2431cb9b463 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java @@ -18,7 +18,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.constants.CommonConstants; -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; import org.apache.dubbo.common.utils.ClassUtils; import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.NetUtils; @@ -33,11 +32,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_CLIENT; -import static org.apache.dubbo.common.logger.LoggerFactory.getErrorTypeAwareLogger; public class AutoSwitchConnectionClient extends AbstractConnectionClient { - private static final ErrorTypeAwareLogger LOGGER = getErrorTypeAwareLogger(AutoSwitchConnectionClient.class); private static final int MAX_RETRIES = 8; private final URL url; @@ -58,8 +55,8 @@ public AutoSwitchConnectionClient(URL url, AbstractConnectionClient connectionCl ClassLoader tccl = Thread.currentThread().getContextClassLoader(); connectionClient.addConnectedListener(() -> ClassUtils.runWith(tccl, () -> executor.execute(this::negotiate))); increase(); - if (LOGGER.isInfoEnabled()) { - LOGGER.info( + if (logger.isInfoEnabled()) { + logger.info( "Start HTTP/3 AutoSwitchConnectionClient {} connect to the server {}", NetUtils.getLocalAddress(), url.toInetSocketAddress()); @@ -80,14 +77,14 @@ private void negotiate() { if (clientCall == null) { clientCall = new NegotiateClientCall(connectionClient, executor); } - LOGGER.info("Start HTTP/3 negotiation for [{}]", getBaseUrl()); + logger.info("Start HTTP/3 negotiation for [{}]", getBaseUrl()); clientCall.start(url).whenComplete((altSvc, t) -> { if (t == null) { if (altSvc.contains("h3=")) { negotiateSuccess(); return; } - LOGGER.info( + logger.info( "HTTP/3 negotiation succeed, but provider reply alt-svc='{}' not support HTTP/3 for [{}]", altSvc, getBaseUrl()); @@ -101,7 +98,7 @@ private void negotiate() { private void negotiateSuccess() { negotiated = true; - LOGGER.info("HTTP/3 negotiation succeed for [{}], create http3 client", getBaseUrl()); + logger.info("HTTP/3 negotiation succeed for [{}], create http3 client", getBaseUrl()); http3ConnectionClient = Helper.createHttp3Client(url, connectionClient.getDelegateHandler()); http3ConnectionClient.addConnectedListener(() -> setHttp3Connected(true)); http3ConnectionClient.addDisconnectedListener(() -> setHttp3Connected(false)); @@ -111,11 +108,11 @@ private void negotiateSuccess() { private void reScheduleNegotiate(Throwable t) { if (attempt++ < MAX_RETRIES) { int delay = 1 << attempt + 2; - LOGGER.info("HTTP/3 negotiation failed, retry after {} seconds for [{}]", delay, getBaseUrl(), t); + logger.info("HTTP/3 negotiation failed, retry after {} seconds for [{}]", delay, getBaseUrl(), t); executor.schedule(this::negotiate, delay, TimeUnit.SECONDS); return; } - LOGGER.warn( + logger.warn( PROTOCOL_ERROR_CLOSE_CLIENT, "", "", @@ -133,7 +130,7 @@ private void negotiateEnd() { private void setHttp3Connected(boolean http3Connected) { this.http3Connected = http3Connected; - LOGGER.info("Switch protocol to {} for [{}]", http3Connected ? "HTTP/3" : "HTTP/2", url.toString("")); + logger.info("Switch protocol to {} for [{}]", http3Connected ? "HTTP/3" : "HTTP/2", url.toString("")); } public boolean isHttp3Connected() { @@ -155,13 +152,13 @@ public boolean release() { try { connectionClient.release(); } catch (Throwable t) { - LOGGER.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "", t.getMessage(), t); + logger.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "", t.getMessage(), t); } if (http3ConnectionClient != null) { try { http3ConnectionClient.release(); } catch (Throwable t) { - LOGGER.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "", t.getMessage(), t); + logger.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "", t.getMessage(), t); } } return true; diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp3ProtocolTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp3ProtocolTest.java index 342825c3d31..b53fc27f350 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp3ProtocolTest.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp3ProtocolTest.java @@ -108,7 +108,7 @@ void testDemoProtocol() throws Exception { MockStreamObserver outboundMessageSubscriber1 = new MockStreamObserver(); greeterProxy.serverStream(REQUEST_MSG, outboundMessageSubscriber1); outboundMessageSubscriber1.getLatch().await(3000, TimeUnit.MILLISECONDS); - Assertions.assertEquals(outboundMessageSubscriber1.getOnNextData(), REQUEST_MSG); + Assertions.assertEquals(REQUEST_MSG, outboundMessageSubscriber1.getOnNextData()); Assertions.assertTrue(outboundMessageSubscriber1.isOnCompleted()); // 3. test bidirectionalStream @@ -118,7 +118,7 @@ void testDemoProtocol() throws Exception { inboundMessageObserver.onCompleted(); outboundMessageSubscriber2.getLatch().await(3000, TimeUnit.MILLISECONDS); // verify client - Assertions.assertEquals(outboundMessageSubscriber2.getOnNextData(), IGreeter.SERVER_MSG); + Assertions.assertEquals(IGreeter.SERVER_MSG, outboundMessageSubscriber2.getOnNextData()); Assertions.assertTrue(outboundMessageSubscriber2.isOnCompleted()); // verify server MockStreamObserver serverOutboundMessageSubscriber = (MockStreamObserver) serviceImpl.getMockStreamObserver();