Skip to content

Commit

Permalink
HTTP3 bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
oxsean committed Nov 28, 2024
1 parent a8ea7b6 commit 4d99553
Show file tree
Hide file tree
Showing 28 changed files with 414 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected URL setThreadNameIfAbsent(URL url, String executorCacheKey) {

@Override
protected String getProviderKey(URL url) {
if (url.getAttributes().containsKey(SERVICE_EXECUTOR)) {
if (url.hasAttribute(SERVICE_EXECUTOR)) {
return url.getServiceKey();
} else {
return super.getProviderKey(url);
Expand All @@ -55,7 +55,7 @@ protected String getProviderKey(URL url) {

@Override
protected String getProviderKey(ProviderModel providerModel, URL url) {
if (url.getAttributes().containsKey(SERVICE_EXECUTOR)) {
if (url.hasAttribute(SERVICE_EXECUTOR)) {
return providerModel.getServiceKey();
} else {
return super.getProviderKey(url);
Expand All @@ -64,7 +64,7 @@ protected String getProviderKey(ProviderModel providerModel, URL url) {

@Override
protected ExecutorService createExecutor(URL url) {
Object executor = url.getAttributes().get(SERVICE_EXECUTOR);
Object executor = url.getAttribute(SERVICE_EXECUTOR);
if (executor instanceof ExecutorService) {
return (ExecutorService) executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ private void processServiceExecutor(URL url) {
* and obtained it in IsolationExecutorRepository#createExecutor method
*/
providerModel.getServiceMetadata().addAttribute(SERVICE_EXECUTOR, getExecutor());
url.getAttributes().put(SERVICE_EXECUTOR, getExecutor());
url.putAttribute(SERVICE_EXECUTOR, getExecutor());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.api.connection;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.WireProtocol;
Expand All @@ -32,9 +30,6 @@

public abstract class AbstractConnectionClient extends AbstractClient {

private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(AbstractConnectionClient.class);

protected WireProtocol protocol;

protected InetSocketAddress remote;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.NetUtils;
Expand Down Expand Up @@ -55,8 +53,6 @@
*/
public abstract class AbstractClient extends AbstractEndpoint implements Client {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class);

private final Lock connectLock = new ReentrantLock();

private final boolean needReconnect;
Expand Down Expand Up @@ -148,7 +144,7 @@ protected AbstractClient() {
private void initExecutor(URL url) {
ExecutorRepository executorRepository = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());

/**
/*
* Consumer's executor is shared globally, provider ip doesn't need to be part of the thread name.
*
* Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
Expand Down Expand Up @@ -415,36 +411,26 @@ public String toString() {

/**
* Open client.
*
* @throws Throwable
*/
protected abstract void doOpen() throws Throwable;

/**
* Close client.
*
* @throws Throwable
*/
protected abstract void doClose() throws Throwable;

/**
* Connect to server.
*
* @throws Throwable
*/
protected abstract void doConnect() throws Throwable;

/**
* disConnect to server.
*
* @throws Throwable
*/
protected abstract void doDisConnect() throws Throwable;

/**
* Get the connected channel.
*
* @return channel
*/
protected abstract Channel getChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractEndpoint.class);
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());

private Codec2 codec;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.transport;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
Expand All @@ -45,7 +43,7 @@
* AbstractServer
*/
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractServer.class);

private Set<ExecutorService> executors = new ConcurrentHashSet<>();
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http3.netty4;

public final class Constants {

public static final String PIPELINE_CONFIGURATOR_KEY = "http3PipelineConfigurator";
public static final CharSequence TRI_PING = "tri-ping";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.http3.netty4;

import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.HttpStatus;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
import org.apache.dubbo.remoting.http12.h2.Http2MetadataFrame;
Expand All @@ -34,22 +35,40 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName;
import io.netty.incubator.codec.http3.DefaultHttp3DataFrame;
import io.netty.incubator.codec.http3.DefaultHttp3Headers;
import io.netty.incubator.codec.http3.DefaultHttp3HeadersFrame;
import io.netty.incubator.codec.http3.Http3DataFrame;
import io.netty.incubator.codec.http3.Http3Headers;
import io.netty.incubator.codec.http3.Http3HeadersFrame;
import io.netty.incubator.codec.http3.Http3RequestStreamInboundHandler;
import io.netty.incubator.codec.quic.QuicStreamChannel;

import static org.apache.dubbo.remoting.http3.netty4.Constants.TRI_PING;

@Sharable
public class NettyHttp3FrameCodec extends Http3RequestStreamInboundHandler implements ChannelOutboundHandler {

public static final NettyHttp3FrameCodec INSTANCE = new NettyHttp3FrameCodec();

@Override
protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) {
ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new DefaultHttpHeaders(frame.headers()), false));
Http3Headers headers = frame.headers();
if (headers.contains(TRI_PING)) {
pingReceived(ctx);
return;
}

ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new DefaultHttpHeaders(headers), false));
}

private void pingReceived(ChannelHandlerContext ctx) {
Http3Headers pongHeader = new DefaultHttp3Headers(false);
pongHeader.set(TRI_PING, "0");
pongHeader.set(PseudoHeaderName.STATUS.value(), HttpStatus.OK.getStatusString());
ctx.write(new DefaultHttp3HeadersFrame(pongHeader));
ctx.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;

final class Helper {
final class Http3Helper {

@SuppressWarnings("unchecked")
static <T extends QuicCodecBuilder<T>> T configCodec(QuicCodecBuilder<T> builder, URL url) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,48 @@
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.http3.Http3SslContexts;
import org.apache.dubbo.remoting.utils.UrlUtils;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3ClientConnectionHandler;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicChannelBootstrap;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY;

public final class NettyHttp3ConnectionClient extends AbstractNettyConnectionClient {

private AtomicReference<io.netty.channel.Channel> datagramChannel;

private QuicChannelBootstrap bootstrap;
private Consumer<ChannelPipeline> pipelineConfigurator;

public NettyHttp3ConnectionClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
}

@Override
@SuppressWarnings("unchecked")
protected void initConnectionClient() {
super.initConnectionClient();
datagramChannel = new AtomicReference<>();
pipelineConfigurator = (Consumer<ChannelPipeline>) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
}

@Override
protected void initBootstrap() throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
io.netty.channel.ChannelHandler codec = Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl())
.maxIdleTimeout(idleTimeout, MILLISECONDS)
io.netty.channel.ChannelHandler codec = Http3Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl())
.sslContext(Http3SslContexts.buildClientSslContext(getUrl()))
.build();
io.netty.channel.Channel nettyDatagramChannel = new Bootstrap()
Expand All @@ -81,17 +81,15 @@ protected void initChannel(NioDatagramChannel ch) {
datagramChannel.set(nettyDatagramChannel);
nettyDatagramChannel.closeFuture().addListener(channelFuture -> datagramChannel.set(null));

int heartbeat = UrlUtils.getHeartbeat(getUrl());
NettyConnectionHandler connectionHandler = new NettyConnectionHandler(this);
bootstrap = QuicChannel.newBootstrap(nettyDatagramChannel)
.handler(new ChannelInitializer<QuicChannel>() {
@Override
protected void initChannel(QuicChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(heartbeat, 0, 0, MILLISECONDS))
.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler)
.addLast(new Http3ClientConnectionHandler());

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new Http3ClientConnectionHandler());
pipeline.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler);
pipelineConfigurator.accept(pipeline);
ch.closeFuture().addListener(channelFuture -> clearNettyChannel());
}
})
Expand Down
Loading

0 comments on commit 4d99553

Please sign in to comment.