Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP3 bugfix #14955

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.api.connection;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.WireProtocol;
Expand All @@ -32,9 +30,6 @@

public abstract class AbstractConnectionClient extends AbstractClient {

private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(AbstractConnectionClient.class);

protected WireProtocol protocol;

protected InetSocketAddress remote;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.NetUtils;
Expand Down Expand Up @@ -55,8 +53,6 @@
*/
public abstract class AbstractClient extends AbstractEndpoint implements Client {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class);

private final Lock connectLock = new ReentrantLock();

private final boolean needReconnect;
Expand Down Expand Up @@ -148,7 +144,7 @@ protected AbstractClient() {
private void initExecutor(URL url) {
ExecutorRepository executorRepository = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());

/**
/*
* Consumer's executor is shared globally, provider ip doesn't need to be part of the thread name.
*
* Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
Expand Down Expand Up @@ -415,36 +411,26 @@ public String toString() {

/**
* Open client.
*
* @throws Throwable
*/
protected abstract void doOpen() throws Throwable;

/**
* Close client.
*
* @throws Throwable
*/
protected abstract void doClose() throws Throwable;

/**
* Connect to server.
*
* @throws Throwable
*/
protected abstract void doConnect() throws Throwable;

/**
* disConnect to server.
*
* @throws Throwable
*/
protected abstract void doDisConnect() throws Throwable;

/**
* Get the connected channel.
*
* @return channel
*/
protected abstract Channel getChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractEndpoint.class);
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());

private Codec2 codec;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.transport;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
Expand All @@ -45,7 +43,7 @@
* AbstractServer
*/
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractServer.class);

private Set<ExecutorService> executors = new ConcurrentHashSet<>();
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http3.netty4;

public final class Constants {

public static final String PIPELINE_CONFIGURATOR_KEY = "http3PipelineConfigurator";
public static final CharSequence TRI_PING = "tri-ping";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.http3.netty4;

import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.HttpStatus;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
import org.apache.dubbo.remoting.http12.h2.Http2MetadataFrame;
Expand All @@ -34,22 +35,40 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName;
import io.netty.incubator.codec.http3.DefaultHttp3DataFrame;
import io.netty.incubator.codec.http3.DefaultHttp3Headers;
import io.netty.incubator.codec.http3.DefaultHttp3HeadersFrame;
import io.netty.incubator.codec.http3.Http3DataFrame;
import io.netty.incubator.codec.http3.Http3Headers;
import io.netty.incubator.codec.http3.Http3HeadersFrame;
import io.netty.incubator.codec.http3.Http3RequestStreamInboundHandler;
import io.netty.incubator.codec.quic.QuicStreamChannel;

import static org.apache.dubbo.remoting.http3.netty4.Constants.TRI_PING;

@Sharable
public class NettyHttp3FrameCodec extends Http3RequestStreamInboundHandler implements ChannelOutboundHandler {

public static final NettyHttp3FrameCodec INSTANCE = new NettyHttp3FrameCodec();

@Override
protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) {
ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new DefaultHttpHeaders(frame.headers()), false));
Http3Headers headers = frame.headers();
if (headers.contains(TRI_PING)) {
pingReceived(ctx);
return;
}

ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new DefaultHttpHeaders(headers), false));
}

private void pingReceived(ChannelHandlerContext ctx) {
Http3Headers pongHeader = new DefaultHttp3Headers(false);
pongHeader.set(TRI_PING, "0");
pongHeader.set(PseudoHeaderName.STATUS.value(), HttpStatus.OK.getStatusString());
ctx.write(new DefaultHttp3HeadersFrame(pongHeader));
ctx.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;

final class Helper {
final class Http3Helper {

@SuppressWarnings("unchecked")
static <T extends QuicCodecBuilder<T>> T configCodec(QuicCodecBuilder<T> builder, URL url) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,50 @@
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.http3.Http3SslContexts;
import org.apache.dubbo.remoting.utils.UrlUtils;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3ClientConnectionHandler;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicChannelBootstrap;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY;

public final class NettyHttp3ConnectionClient extends AbstractNettyConnectionClient {

private Consumer<ChannelPipeline> pipelineConfigurator;
private AtomicReference<io.netty.channel.Channel> datagramChannel;

private QuicChannelBootstrap bootstrap;

public NettyHttp3ConnectionClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
}

@Override
@SuppressWarnings("unchecked")
protected void initConnectionClient() {
super.initConnectionClient();
datagramChannel = new AtomicReference<>();
pipelineConfigurator = (Consumer<ChannelPipeline>) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator should be set");
}

@Override
protected void initBootstrap() throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
io.netty.channel.ChannelHandler codec = Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl())
.maxIdleTimeout(idleTimeout, MILLISECONDS)
io.netty.channel.ChannelHandler codec = Http3Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl())
.sslContext(Http3SslContexts.buildClientSslContext(getUrl()))
.build();
io.netty.channel.Channel nettyDatagramChannel = new Bootstrap()
Expand All @@ -81,17 +83,15 @@ protected void initChannel(NioDatagramChannel ch) {
datagramChannel.set(nettyDatagramChannel);
nettyDatagramChannel.closeFuture().addListener(channelFuture -> datagramChannel.set(null));

int heartbeat = UrlUtils.getHeartbeat(getUrl());
NettyConnectionHandler connectionHandler = new NettyConnectionHandler(this);
bootstrap = QuicChannel.newBootstrap(nettyDatagramChannel)
.handler(new ChannelInitializer<QuicChannel>() {
@Override
protected void initChannel(QuicChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(heartbeat, 0, 0, MILLISECONDS))
.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler)
.addLast(new Http3ClientConnectionHandler());

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new Http3ClientConnectionHandler());
pipeline.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler);
pipelineConfigurator.accept(pipeline);
oxsean marked this conversation as resolved.
Show resolved Hide resolved
ch.closeFuture().addListener(channelFuture -> clearNettyChannel());
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,104 +18,78 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler;
import org.apache.dubbo.remoting.http3.Http3SslContexts;
import org.apache.dubbo.remoting.http3.netty4.NettyHttp3FrameCodec;
import org.apache.dubbo.remoting.http3.netty4.NettyHttp3ProtocolSelectorHandler;
import org.apache.dubbo.remoting.transport.AbstractServer;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3ServerConnectionHandler;
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.util.concurrent.Future;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
import static org.apache.dubbo.remoting.Constants.EVENT_LOOP_BOSS_POOL_NAME;
import static org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY;

public class NettyHttp3Server extends AbstractServer {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyHttp3Server.class);

private Map<String, Channel> channels;
private Bootstrap bootstrap;
private EventLoopGroup bossGroup;
private io.netty.channel.Channel channel;

private final Consumer<ChannelPipeline> pipelineConfigurator;
private final int serverShutdownTimeoutMills;

@SuppressWarnings("unchecked")
public NettyHttp3Server(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, url));
pipelineConfigurator = (Consumer<ChannelPipeline>) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator should be set");
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}

@Override
protected void doOpen() throws Throwable {
bootstrap = new Bootstrap();

bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);

NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();

FrameworkModel frameworkModel = ScopeModelUtil.getFrameworkModel(getUrl().getScopeModel());
NettyHttp3ProtocolSelectorHandler selectorHandler =
new NettyHttp3ProtocolSelectorHandler(getUrl(), frameworkModel);

int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
io.netty.channel.ChannelHandler codec = Helper.configCodec(Http3.newQuicServerCodecBuilder(), getUrl())
.sslContext(Http3SslContexts.buildServerSslContext(getUrl()))
.maxIdleTimeout(idleTimeout, MILLISECONDS)
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
.handler(new ChannelInitializer<QuicChannel>() {
@Override
protected void initChannel(QuicChannel ch) {
ch.pipeline()
.addLast(nettyServerHandler)
.addLast(new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast(new Http3ServerConnectionHandler(new ChannelInitializer<QuicStreamChannel>() {
@Override
protected void initChannel(QuicStreamChannel ch) {
ch.pipeline()
.addLast(NettyHttp3FrameCodec.INSTANCE)
.addLast(new HttpWriteQueueHandler())
.addLast(selectorHandler);
}
}));
}
})
.build();

// bind
try {
ChannelFuture channelFuture = bootstrap
.group(bossGroup)
.channel(NioDatagramChannel.class)
.handler(codec)
.handler(Http3Helper.configCodec(Http3.newQuicServerCodecBuilder(), getUrl())
.sslContext(Http3SslContexts.buildServerSslContext(getUrl()))
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
.handler(new ChannelInitializer<QuicChannel>() {
@Override
protected void initChannel(QuicChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipelineConfigurator.accept(pipeline);
oxsean marked this conversation as resolved.
Show resolved Hide resolved
pipeline.addLast(nettyServerHandler);
}
})
.build())
.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
Expand Down
Loading
Loading