Skip to content

Commit

Permalink
HTTP3 bugfix (#14955)
Browse files Browse the repository at this point in the history
* HTTP3 bugfix

* Fix NPE when address is null

---------

Co-authored-by: xiaosheng <[email protected]>
  • Loading branch information
oxsean and songxiaosheng authored Dec 6, 2024
1 parent 21f5e1d commit 9633296
Show file tree
Hide file tree
Showing 27 changed files with 406 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.api.connection;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.WireProtocol;
Expand All @@ -32,9 +30,6 @@

public abstract class AbstractConnectionClient extends AbstractClient {

private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(AbstractConnectionClient.class);

protected WireProtocol protocol;

protected InetSocketAddress remote;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.NetUtils;
Expand Down Expand Up @@ -55,8 +53,6 @@
*/
public abstract class AbstractClient extends AbstractEndpoint implements Client {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class);

private final Lock connectLock = new ReentrantLock();

private final boolean needReconnect;
Expand Down Expand Up @@ -148,7 +144,7 @@ protected AbstractClient() {
private void initExecutor(URL url) {
ExecutorRepository executorRepository = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());

/**
/*
* Consumer's executor is shared globally, provider ip doesn't need to be part of the thread name.
*
* Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
Expand Down Expand Up @@ -415,36 +411,26 @@ public String toString() {

/**
* Open client.
*
* @throws Throwable
*/
protected abstract void doOpen() throws Throwable;

/**
* Close client.
*
* @throws Throwable
*/
protected abstract void doClose() throws Throwable;

/**
* Connect to server.
*
* @throws Throwable
*/
protected abstract void doConnect() throws Throwable;

/**
* disConnect to server.
*
* @throws Throwable
*/
protected abstract void doDisConnect() throws Throwable;

/**
* Get the connected channel.
*
* @return channel
*/
protected abstract Channel getChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractEndpoint.class);
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());

private Codec2 codec;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.transport;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
Expand All @@ -45,7 +43,7 @@
* AbstractServer
*/
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractServer.class);

private Set<ExecutorService> executors = new ConcurrentHashSet<>();
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http3.netty4;

public final class Constants {

public static final String PIPELINE_CONFIGURATOR_KEY = "http3PipelineConfigurator";
public static final CharSequence TRI_PING = "tri-ping";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.http3.netty4;

import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.HttpStatus;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
import org.apache.dubbo.remoting.http12.h2.Http2MetadataFrame;
Expand All @@ -34,22 +35,40 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName;
import io.netty.incubator.codec.http3.DefaultHttp3DataFrame;
import io.netty.incubator.codec.http3.DefaultHttp3Headers;
import io.netty.incubator.codec.http3.DefaultHttp3HeadersFrame;
import io.netty.incubator.codec.http3.Http3DataFrame;
import io.netty.incubator.codec.http3.Http3Headers;
import io.netty.incubator.codec.http3.Http3HeadersFrame;
import io.netty.incubator.codec.http3.Http3RequestStreamInboundHandler;
import io.netty.incubator.codec.quic.QuicStreamChannel;

import static org.apache.dubbo.remoting.http3.netty4.Constants.TRI_PING;

@Sharable
public class NettyHttp3FrameCodec extends Http3RequestStreamInboundHandler implements ChannelOutboundHandler {

public static final NettyHttp3FrameCodec INSTANCE = new NettyHttp3FrameCodec();

@Override
protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) {
ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new DefaultHttpHeaders(frame.headers()), false));
Http3Headers headers = frame.headers();
if (headers.contains(TRI_PING)) {
pingReceived(ctx);
return;
}

ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new DefaultHttpHeaders(headers), false));
}

private void pingReceived(ChannelHandlerContext ctx) {
Http3Headers pongHeader = new DefaultHttp3Headers(false);
pongHeader.set(TRI_PING, "0");
pongHeader.set(PseudoHeaderName.STATUS.value(), HttpStatus.OK.getStatusString());
ctx.write(new DefaultHttp3HeadersFrame(pongHeader));
ctx.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;

final class Helper {
final class Http3Helper {

@SuppressWarnings("unchecked")
static <T extends QuicCodecBuilder<T>> T configCodec(QuicCodecBuilder<T> builder, URL url) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,50 @@
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.http3.Http3SslContexts;
import org.apache.dubbo.remoting.utils.UrlUtils;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3ClientConnectionHandler;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicChannelBootstrap;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY;

public final class NettyHttp3ConnectionClient extends AbstractNettyConnectionClient {

private Consumer<ChannelPipeline> pipelineConfigurator;
private AtomicReference<io.netty.channel.Channel> datagramChannel;

private QuicChannelBootstrap bootstrap;

public NettyHttp3ConnectionClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
}

@Override
@SuppressWarnings("unchecked")
protected void initConnectionClient() {
super.initConnectionClient();
datagramChannel = new AtomicReference<>();
pipelineConfigurator = (Consumer<ChannelPipeline>) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator should be set");
}

@Override
protected void initBootstrap() throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
io.netty.channel.ChannelHandler codec = Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl())
.maxIdleTimeout(idleTimeout, MILLISECONDS)
io.netty.channel.ChannelHandler codec = Http3Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl())
.sslContext(Http3SslContexts.buildClientSslContext(getUrl()))
.build();
io.netty.channel.Channel nettyDatagramChannel = new Bootstrap()
Expand All @@ -81,17 +83,15 @@ protected void initChannel(NioDatagramChannel ch) {
datagramChannel.set(nettyDatagramChannel);
nettyDatagramChannel.closeFuture().addListener(channelFuture -> datagramChannel.set(null));

int heartbeat = UrlUtils.getHeartbeat(getUrl());
NettyConnectionHandler connectionHandler = new NettyConnectionHandler(this);
bootstrap = QuicChannel.newBootstrap(nettyDatagramChannel)
.handler(new ChannelInitializer<QuicChannel>() {
@Override
protected void initChannel(QuicChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(heartbeat, 0, 0, MILLISECONDS))
.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler)
.addLast(new Http3ClientConnectionHandler());

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new Http3ClientConnectionHandler());
pipeline.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler);
pipelineConfigurator.accept(pipeline);
ch.closeFuture().addListener(channelFuture -> clearNettyChannel());
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,104 +18,78 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler;
import org.apache.dubbo.remoting.http3.Http3SslContexts;
import org.apache.dubbo.remoting.http3.netty4.NettyHttp3FrameCodec;
import org.apache.dubbo.remoting.http3.netty4.NettyHttp3ProtocolSelectorHandler;
import org.apache.dubbo.remoting.transport.AbstractServer;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3ServerConnectionHandler;
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.util.concurrent.Future;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
import static org.apache.dubbo.remoting.Constants.EVENT_LOOP_BOSS_POOL_NAME;
import static org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY;

public class NettyHttp3Server extends AbstractServer {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyHttp3Server.class);

private Map<String, Channel> channels;
private Bootstrap bootstrap;
private EventLoopGroup bossGroup;
private io.netty.channel.Channel channel;

private final Consumer<ChannelPipeline> pipelineConfigurator;
private final int serverShutdownTimeoutMills;

@SuppressWarnings("unchecked")
public NettyHttp3Server(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, url));
pipelineConfigurator = (Consumer<ChannelPipeline>) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator should be set");
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}

@Override
protected void doOpen() throws Throwable {
bootstrap = new Bootstrap();

bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);

NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();

FrameworkModel frameworkModel = ScopeModelUtil.getFrameworkModel(getUrl().getScopeModel());
NettyHttp3ProtocolSelectorHandler selectorHandler =
new NettyHttp3ProtocolSelectorHandler(getUrl(), frameworkModel);

int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
io.netty.channel.ChannelHandler codec = Helper.configCodec(Http3.newQuicServerCodecBuilder(), getUrl())
.sslContext(Http3SslContexts.buildServerSslContext(getUrl()))
.maxIdleTimeout(idleTimeout, MILLISECONDS)
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
.handler(new ChannelInitializer<QuicChannel>() {
@Override
protected void initChannel(QuicChannel ch) {
ch.pipeline()
.addLast(nettyServerHandler)
.addLast(new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast(new Http3ServerConnectionHandler(new ChannelInitializer<QuicStreamChannel>() {
@Override
protected void initChannel(QuicStreamChannel ch) {
ch.pipeline()
.addLast(NettyHttp3FrameCodec.INSTANCE)
.addLast(new HttpWriteQueueHandler())
.addLast(selectorHandler);
}
}));
}
})
.build();

// bind
try {
ChannelFuture channelFuture = bootstrap
.group(bossGroup)
.channel(NioDatagramChannel.class)
.handler(codec)
.handler(Http3Helper.configCodec(Http3.newQuicServerCodecBuilder(), getUrl())
.sslContext(Http3SslContexts.buildServerSslContext(getUrl()))
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
.handler(new ChannelInitializer<QuicChannel>() {
@Override
protected void initChannel(QuicChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipelineConfigurator.accept(pipeline);
pipeline.addLast(nettyServerHandler);
}
})
.build())
.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
Expand Down
Loading

0 comments on commit 9633296

Please sign in to comment.