Skip to content

Commit

Permalink
feature[PanamaUring]
Browse files Browse the repository at this point in the history
1,实现probe以及添加默认操作符合法性校验
2,修复一个潜在的wakeup失败的问题
  • Loading branch information
dreamlike-ocean committed Apr 15, 2024
1 parent a04ec1b commit 08c2a9d
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.CompletableFuture;

public interface IoUringSelectedReadableFd extends IoUringOperator, NativeFd {

IoUringBufferRing bufferRing();

default CancelableFuture<OwnershipMemory> asyncSelectedRead(int len, int offset) {
Expand All @@ -30,7 +29,7 @@ default CancelableFuture<OwnershipMemory> asyncSelectedRead(int len, int offset)
sqe.setFlags((byte) (sqe.getFlags() | IoUringConstant.IOSQE_BUFFER_SELECT));
sqe.setBufGroup(bufferRing.getBufferGroupId());
})
.thenCompose(cqe -> {
.thenComposeAsync(cqe -> {
int syscallResult = cqe.getRes();
if (syscallResult < 0) {
return CompletableFuture.failedFuture(new SyscallException(syscallResult));
Expand All @@ -40,7 +39,7 @@ default CancelableFuture<OwnershipMemory> asyncSelectedRead(int len, int offset)
IoUringBufferRingElement ringElement = bufferRing.removeBuffer(bid).resultNow();
return CompletableFuture.completedFuture(borrowUringBufferRingElement(ringElement, readLen));
}
});
}, r -> owner().runOnEventLoop(r));
}

default CancelableFuture<IoUringSyscallResult<OwnershipMemory>> asyncSelectedReadResult(int len, int offset) {
Expand All @@ -50,7 +49,8 @@ default CancelableFuture<IoUringSyscallResult<OwnershipMemory>> asyncSelectedRea
sqe.setFlags((byte) (sqe.getFlags() | IoUringConstant.IOSQE_BUFFER_SELECT));
sqe.setBufGroup(bufferRing.getBufferGroupId());
})
.thenApply(cqe -> {
.thenApplyAsync(cqe -> {
//强制制定在eventloop上 防止外部get导致切换线程
IoUringSyscallResult<OwnershipMemory> result;
if (cqe.getRes() < 0) {
result = new IoUringSyscallResult<>(cqe.getRes(), OwnershipMemory.of(MemorySegment.NULL));
Expand All @@ -61,7 +61,7 @@ default CancelableFuture<IoUringSyscallResult<OwnershipMemory>> asyncSelectedRea
result = new IoUringSyscallResult<>(cqe.getRes(), borrowUringBufferRingElement(ringElement, readLen));
}
return result;
});
}, r -> owner().runOnEventLoop(r));

}

Expand Down Expand Up @@ -97,7 +97,7 @@ public MemorySegment resource() {

@Override
public void drop() {
IoUringBufferRingElement waitToRelease = (IoUringBufferRingElement) ELEMENT_VH.compareAndExchange(this, element, (IoUringBufferRingElement)null);
IoUringBufferRingElement waitToRelease = (IoUringBufferRingElement) ELEMENT_VH.compareAndExchange(this, element, (IoUringBufferRingElement) null);
if (waitToRelease == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import top.dreamlike.panama.uring.nativelib.Instance;
import top.dreamlike.panama.uring.nativelib.exception.SyscallException;
import top.dreamlike.panama.uring.nativelib.helper.NativeHelper;
import top.dreamlike.panama.uring.nativelib.helper.OSIoUringProbe;
import top.dreamlike.panama.uring.nativelib.libs.LibUring;
import top.dreamlike.panama.uring.nativelib.struct.liburing.*;
import top.dreamlike.panama.uring.nativelib.struct.time.KernelTime64Type;
Expand Down Expand Up @@ -37,6 +38,8 @@

public class IoUringEventLoop extends Thread implements AutoCloseable, Executor {

private static final OSIoUringProbe PROBE = new OSIoUringProbe();

private static final AtomicInteger count = new AtomicInteger(0);
private static final Logger log = LogManager.getLogger(IoUringEventLoop.class);

Expand Down Expand Up @@ -108,7 +111,6 @@ private void initWakeUpFdMultiShot() {
@Override
public void run() {
while (!hasClosed.get()) {
inWait.set(true);
while (true) {
ScheduledTask next = scheduledTasks.peek();
if (next != null && next.deadlineNanos <= System.nanoTime()) {
Expand All @@ -132,7 +134,7 @@ public void run() {
kernelTime64Type.setTv_nsec(duration % 1000000000);
libUring.io_uring_submit_and_wait_timeout(internalRing, cqePtrs, cqeSize, kernelTime64Type, null);
}

inWait.set(true);
processCqes();
}
releaseResource();
Expand Down Expand Up @@ -167,7 +169,7 @@ public <V> CompletableFuture<V> runOnEventLoop(Supplier<V> callable) {
return future;
}

private void runOnEventLoop(Runnable runnable) {
public void runOnEventLoop(Runnable runnable) {
if (Thread.currentThread() == this) {
runWithCatchException(runnable);
} else {
Expand Down Expand Up @@ -212,6 +214,10 @@ private CancelToken fillTemplate(Consumer<IoUringSqe> sqeFunction, Consumer<IoUr
Runnable r = () -> {
IoUringSqe sqe = ioUringGetSqe();
sqeFunction.accept(sqe);
if (NativeHelper.enableOpVersionCheck && sqe.getOpcode() > PROBE.getLastOp()) {
Instance.LIB_URING.io_uring_back_sqe(internalRing);
throw new UnsupportedOperationException(sqe.getOpcode() + " is unsupported");
}
sqe.setUser_data(token);
callBackMap.put(token, new IoUringCompletionCallBack(sqe.getFd(), sqe.getOpcode(), callback));
if (needSubmit) {
Expand Down Expand Up @@ -251,6 +257,12 @@ public CancelToken asyncOperation(Consumer<IoUringSqe> sqeFunction, Consumer<IoU
Runnable r = () -> {
IoUringSqe sqe = ioUringGetSqe();
sqeFunction.accept(sqe);

if (NativeHelper.enableOpVersionCheck && sqe.getOpcode() > PROBE.getLastOp()) {
Instance.LIB_URING.io_uring_back_sqe(internalRing);
throw new UnsupportedOperationException(sqe.getOpcode() + " is unsupported");
}

sqe.setUser_data(token);
if (enableLink) {
sqe.setFlags((byte) (sqe.getFlags() | IoUringConstant.IOSQE_IO_LINK));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package top.dreamlike.panama.uring.nativelib;

import top.dreamlike.panama.generator.proxy.NativeCallGenerator;
import top.dreamlike.panama.generator.proxy.StructProxyGenerator;
import top.dreamlike.panama.uring.nativelib.helper.NativeHelper;
import top.dreamlike.panama.uring.nativelib.libs.LibEpoll;
import top.dreamlike.panama.uring.nativelib.libs.LibJemalloc;
import top.dreamlike.panama.uring.nativelib.libs.LibUring;
Expand All @@ -20,6 +20,7 @@ public class Instance {
public static final LibJemalloc LIB_JEMALLOC = new LibJemalloc() {

private static final LibJemalloc FFI = NATIVE_CALL_GENERATOR.generate(LibJemalloc.class);

@Override
public MemorySegment malloc(long size) {
return FFI.malloc(size).reinterpret(size);
Expand All @@ -43,12 +44,7 @@ public int posix_memalign(MemorySegment memptr, long alignment, long size) {

static {
NATIVE_CALL_GENERATOR.indyMode();
LibUring generate = NATIVE_CALL_GENERATOR.generate(LibUring.class);
if (Boolean.parseBoolean(System.getProperty("enable-detect-os-version", "false"))) {
LIB_URING = NativeHelper.enhanceCheck(generate, LibUring.class);
} else {
LIB_URING = generate;
}
LIB_URING = NATIVE_CALL_GENERATOR.generate(LibUring.class);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,23 @@
import top.dreamlike.panama.uring.eventloop.IoUringEventLoop;
import top.dreamlike.panama.uring.helper.LambdaHelper;
import top.dreamlike.panama.uring.nativelib.Instance;
import top.dreamlike.panama.uring.nativelib.exception.ErrorKernelVersionException;
import top.dreamlike.panama.uring.nativelib.exception.SyscallException;
import top.dreamlike.panama.uring.trait.OwnershipResource;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.lang.invoke.VarHandle;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Optional;
import java.util.function.IntSupplier;

public class NativeHelper {

public static String JAVA_IO_TMPDIR = System.getProperty("java.io.tmpdir");

public static boolean enableOpVersionCheck = System.getProperty("enable-detect-os-version", "true").equalsIgnoreCase("true");

private static final Logger logger = LogManager.getLogger(NativeHelper.class);

private static final String[] errStr = new String[257];
Expand Down Expand Up @@ -116,25 +114,6 @@ public static <T> void dropBatch(List<OwnershipResource<T>> memories) {
}
}


public static <T> T enhanceCheck(T afterProxy, Class<T> nativeInterface) {
return (T) Proxy.newProxyInstance(nativeInterface.getClassLoader(), new Class[]{nativeInterface}, (Object proxy, Method method, Object[] args) -> {
KernelVersionLimit annotation = Optional.ofNullable(method.getAnnotation(KernelVersionLimit.class)).orElse(nativeInterface.getAnnotation(KernelVersionLimit.class));
if (annotation != null) {
if (!osLinux) {
logger.error("This method is only supported on Linux");
throw new ErrorKernelVersionException();
}
if (!allowCurrentLinuxVersion(annotation.major(), annotation.minor())) {
logger.error("This method is only supported on Linux kernel version {}.{}, current version is {}.{}",
annotation.major(), annotation.minor(), currentLinuxMajor, currentLinuxMinor);
throw new ErrorKernelVersionException(annotation.major(), annotation.minor());
}
}
return method.invoke(afterProxy, args);
});
}

public static boolean inSameEventLoop(IoUringEventLoop eventLoop, Object o) {
if (isSkipSameEventLoopCheck) {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package top.dreamlike.panama.uring.nativelib.helper;

import top.dreamlike.panama.generator.proxy.NativeArray;
import top.dreamlike.panama.generator.proxy.StructProxyGenerator;
import top.dreamlike.panama.uring.nativelib.Instance;

import java.lang.foreign.MemorySegment;

public class OSIoUringProbe {

private final int lastOp;

private final IoUringProbeOp[] ops;

public OSIoUringProbe() {
var probe = Instance.LIB_URING.io_uring_get_probe();
if (probe == null) {
throw new RuntimeException("Failed to get probe");
}
lastOp = probe.getLastOp();
byte len = probe.getOpsLen();
ops = new IoUringProbeOp[len];
MemorySegment opsBase = StructProxyGenerator.findMemorySegment(probe)
.asSlice(top.dreamlike.panama.uring.nativelib.struct.liburing.IoUringProbe.OPS_OFFSET)
.reinterpret(len * top.dreamlike.panama.uring.nativelib.struct.liburing.IoUringProbeOp.LAYOUT.byteSize());

NativeArray<top.dreamlike.panama.uring.nativelib.struct.liburing.IoUringProbeOp> ops = Instance.STRUCT_PROXY_GENERATOR.enhanceArray(opsBase);
for (byte i = 0; i < len; i++) {
top.dreamlike.panama.uring.nativelib.struct.liburing.IoUringProbeOp op = ops.get(i);
this.ops[i] = new IoUringProbeOp(op.getOp(), op.getFlags());
}

Instance.LIB_URING.io_uring_free_probe(probe);
}

public int getLastOp() {
return lastOp;
}

public IoUringProbeOp[] getOps() {
return ops;
}

public record IoUringProbeOp(byte op, short flags) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
@CLib("liburing-ffi.so")
@KernelVersionLimit(major = 5, minor = 10)
public interface LibUring {
//跟队列本身相关的操作

@NativeFunction(returnIsPointer = true)
IoUringProbe io_uring_get_probe();

void io_uring_free_probe(@Pointer IoUringProbe probe);

//跟队列本身相关的操作=
int io_uring_queue_init(int entries, @Pointer IoUring ring, int flags);

int io_uring_queue_init_params(int entries, @Pointer IoUring ring, @Pointer IoUringParams p);
Expand Down Expand Up @@ -204,6 +209,13 @@ default IoUringSqe io_uring_get_sqe(@Pointer IoUring ring) {
return Instance.STRUCT_PROXY_GENERATOR.enhance(currentSqe);
}

@NativeFunction(fast = true)
default void io_uring_back_sqe(@Pointer IoUring ring) {
MemorySegment realMemory = StructProxyGenerator.findMemorySegment(ring);
int tail = (int) IoUringConstant.AccessShortcuts.IO_URING_SQ_SQE_TAIL_VARHANDLE.get(realMemory, 0L);
IoUringConstant.AccessShortcuts.IO_URING_SQ_SQE_TAIL_VARHANDLE.set(realMemory, 0L, tail - 1);
}

default void io_uring_prep_rw(int opcode, @Pointer IoUringSqe sqe, int fd, MemorySegment addr, int len, long offset) {
if (!StructProxyGenerator.isNativeStruct(sqe)) {
throw new StructException("sqe is not struct,pleace call StructProxyGenerator::enhance before calling native function");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package top.dreamlike.panama.uring.nativelib.struct.liburing;

import top.dreamlike.panama.generator.annotation.NativeArrayMark;
import top.dreamlike.panama.uring.nativelib.Instance;

import java.lang.foreign.MemoryLayout;
import java.lang.foreign.MemorySegment;

public class IoUringProbe {

public static final MemoryLayout LAYOUT = Instance.STRUCT_PROXY_GENERATOR.extract(IoUringProbe.class);

public static final long OPS_OFFSET = LAYOUT.byteOffset(MemoryLayout.PathElement.groupElement("ops"));

private byte lastOp;

private byte opsLen;

private short resv;

@NativeArrayMark(size = int.class, length = 3)
private MemorySegment resv2;

@NativeArrayMark(size = IoUringProbeOp.class, length = 0)
private MemorySegment ops;

public byte getLastOp() {
return lastOp;
}

public void setLastOp(byte lastOp) {
this.lastOp = lastOp;
}

public byte getOpsLen() {
return opsLen;
}

public void setOpsLen(byte opsLen) {
this.opsLen = opsLen;
}

public short getResv() {
return resv;
}

public void setResv(short resv) {
this.resv = resv;
}

public MemorySegment getResv2() {
return resv2;
}

public void setResv2(MemorySegment resv2) {
this.resv2 = resv2;
}

public MemorySegment getOps() {
return ops;
}

public void setOps(MemorySegment ops) {
this.ops = ops;
}
}
Loading

0 comments on commit 08c2a9d

Please sign in to comment.