Skip to content

Commit

Permalink
feat: support io_uring (#366)
Browse files Browse the repository at this point in the history
* feat: support io_uring
  • Loading branch information
funky-eyes authored Jan 22, 2025
1 parent 8a1fe47 commit b356598
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
<maven.surefire.argLine></maven.surefire.argLine>
<maven.surefire.excludes></maven.surefire.excludes>
<maven.surefire.plugin>2.18.1</maven.surefire.plugin>
<netty-io-uring.version>0.0.25.Final</netty-io-uring.version>
<netty.version>4.1.42.Final</netty.version>
<project.encoding>UTF-8</project.encoding>
<slf4j.version>1.7.21</slf4j.version>
Expand All @@ -107,7 +108,12 @@
<artifactId>sofa-common-tools</artifactId>
<version>${sofa.common.tools}</version>
</dependency>

<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty-io-uring.version}</version>
<scope>provided</scope>
</dependency>
<!-- provided scope -->
<dependency>
<groupId>com.alipay.sofa</groupId>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/alipay/remoting/config/ConfigManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public static boolean netty_epoll_lt_enabled() {
return getBool(Configs.NETTY_EPOLL_LT, Configs.NETTY_EPOLL_LT_DEFAULT);
}

public static boolean netty_io_uring() {
return getBool(Configs.NETTY_IO_URING_SWITCH, Configs.NETTY_IO_URING_SWITCH_DEFAULT);
}

// ~~~ properties for idle
public static boolean tcp_idle_switch() {
return getBool(Configs.TCP_IDLE_SWITCH, Configs.TCP_IDLE_SWITCH_DEFAULT);
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/alipay/remoting/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class Configs {
public static final String NETTY_EPOLL_SWITCH = "bolt.netty.epoll.switch";
public static final String NETTY_EPOLL_SWITCH_DEFAULT = "true";

public static final String NETTY_IO_URING_SWITCH = "bolt.netty.io_uring.switch";
public static final String NETTY_IO_URING_SWITCH_DEFAULT = "false";

/** Netty epoll level trigger enabled */
public static final String NETTY_EPOLL_LT = "bolt.netty.epoll.lt";
public static final String NETTY_EPOLL_LT_DEFAULT = "true";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,9 @@ public void run() {
RpcRequestProcessor.this.doProcess(ctx, msg);
} catch (Throwable e) {
//protect the thread running this task
String remotingAddress = RemotingUtil.parseRemoteAddress(ctx.getChannelContext()
.channel());
String remotingAddress = ctx.getChannelContext() != null
&& ctx.getChannelContext().channel() != null ? RemotingUtil
.parseRemoteAddress(ctx.getChannelContext().channel()) : null;
String errMsg = "Exception caught when process rpc request command in RpcRequestProcessor, Id="
+ msg.getId();
logger.error(errMsg + "! Invoke source address is [" + remotingAddress + "].", e);
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/com/alipay/remoting/util/NettyEventLoopUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.incubator.channel.uring.IOUringSocketChannel;

/**
* Utils for netty EventLoop
Expand All @@ -43,7 +46,10 @@
public class NettyEventLoopUtil {

/** check whether epoll enabled, and it would not be changed during runtime. */
private static boolean epollEnabled = ConfigManager.netty_epoll() && Epoll.isAvailable();
private static final boolean epollEnabled = ConfigManager.netty_epoll()
&& Epoll.isAvailable();

private static final boolean ioUringEnabled = ConfigManager.netty_io_uring();

/**
* Create the right event loop according to current platform and system property, fallback to NIO when epoll not enabled.
Expand All @@ -53,22 +59,25 @@ public class NettyEventLoopUtil {
* @return an EventLoopGroup suitable for the current platform
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
return epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory)
: new NioEventLoopGroup(nThreads, threadFactory);
return ioUringEnabled ? new IOUringEventLoopGroup(nThreads, threadFactory)
: epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory)
: new NioEventLoopGroup(nThreads, threadFactory);
}

/**
* @return a SocketChannel class suitable for the given EventLoopGroup implementation
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass() {
return epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class;
return ioUringEnabled ? IOUringSocketChannel.class
: epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class;
}

/**
* @return a ServerSocketChannel class suitable for the given EventLoopGroup implementation
*/
public static Class<? extends ServerSocketChannel> getServerSocketChannelClass() {
return epollEnabled ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
return ioUringEnabled ? IOUringServerSocketChannel.class
: epollEnabled ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
}

/**
Expand All @@ -77,7 +86,7 @@ public static Class<? extends ServerSocketChannel> getServerSocketChannelClass()
* @param serverBootstrap server bootstrap
*/
public static void enableTriggeredMode(ServerBootstrap serverBootstrap) {
if (epollEnabled) {
if (!ioUringEnabled && epollEnabled) {
if (ConfigManager.netty_epoll_lt_enabled()) {
serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE,
EpollMode.LEVEL_TRIGGERED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -45,6 +47,8 @@ public class RpcCommandHandlerTest {

private static final CountDownLatch countDownLatch = new CountDownLatch(2);

private static final Logger LOGGER = LoggerFactory.getLogger(RpcCommandHandlerTest.class);

@BeforeClass
public static void beforeClass() {
ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<>();
Expand All @@ -65,8 +69,9 @@ public void testHandleCommand() throws Exception {
msg.add(rpcRequestCommand2);
RpcCommandHandler rpcCommandHandler = new RpcCommandHandler(new RpcCommandFactory());
rpcCommandHandler.handleCommand(remotingContext, msg);
countDownLatch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(remotingContextList.size() == 2);
boolean result = countDownLatch.await(15, TimeUnit.SECONDS);
Assert.assertTrue(result);
Assert.assertEquals(2, remotingContextList.size());
Assert.assertTrue(remotingContextList.get(0).getTimeout() != remotingContextList.get(1).getTimeout());
}

Expand All @@ -89,7 +94,7 @@ public boolean isStarted() {

@Override
public BizContext preHandleRequest(RemotingContext remotingCtx, Object request) {
Assert.assertTrue(remotingCtx != remotingContext);
Assert.assertNotSame(remotingCtx, remotingContext);
remotingContextList.add(remotingCtx);
countDownLatch.countDown();
return null;
Expand Down

0 comments on commit b356598

Please sign in to comment.