From c4bcabc840be1a375b2f20e2a8a224ccb321903e Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Mon, 10 Mar 2025 10:30:05 -0400 Subject: [PATCH 1/4] fix close overrides --- .../src/main/java/dev/vortex/api/Array.java | 3 +++ .../src/main/java/dev/vortex/api/ArrayStream.java | 3 +++ .../dev/vortex/spark/read/VortexColumnVector.java | 3 +-- .../dev/vortex/spark/read/VortexPartitionReader.java | 12 ++++++------ 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/Array.java b/java/vortex-jni/src/main/java/dev/vortex/api/Array.java index 35653b4573..2cc5ac808d 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/Array.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/Array.java @@ -43,4 +43,7 @@ public interface Array extends AutoCloseable { String getUTF8(int index); byte[] getBinary(int index); + + @Override + void close(); } diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/ArrayStream.java b/java/vortex-jni/src/main/java/dev/vortex/api/ArrayStream.java index d1019315b7..1292e419f7 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/ArrayStream.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/ArrayStream.java @@ -26,4 +26,7 @@ public interface ArrayStream extends AutoCloseable { * It is an error to call this method if a previous invocation returned false. */ boolean next(); + + @Override + void close(); } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexColumnVector.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexColumnVector.java index e6ceb47082..891afc9e05 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexColumnVector.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexColumnVector.java @@ -17,7 +17,6 @@ import dev.vortex.api.Array; import dev.vortex.spark.SparkTypes; -import java.io.IOException; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; @@ -36,7 +35,7 @@ public VortexColumnVector(Array array) { public void close() { try { array.close(); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException("Failed to close Vortex Array", e); } } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java index d5eab41338..b6330727ea 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java @@ -15,20 +15,20 @@ */ package dev.vortex.spark.read; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - import dev.vortex.api.Array; import dev.vortex.api.ArrayStream; import dev.vortex.api.File; import dev.vortex.api.ScanOptions; import dev.vortex.impl.NativeFile; import dev.vortex.spark.VortexFilePartition; -import java.io.IOException; -import java.util.Objects; import org.apache.spark.sql.connector.read.PartitionReader; import org.apache.spark.sql.vectorized.ColumnarBatch; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + /** * A {@link PartitionReader} that reads columnar batches out of a Vortex file into * Vortex memory format. @@ -72,7 +72,7 @@ void initNativeResources() { } @Override - public void close() throws IOException { + public void close() { checkState(Objects.nonNull(file), "File was closed"); checkState(Objects.nonNull(arrayStream), "ArrayStream was closed"); From 4f889a1d8879e37e1296f65aeefa645cc36c6d39 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Mon, 10 Mar 2025 12:20:56 -0400 Subject: [PATCH 2/4] consume field dtypes when building struct dtype --- vortex-ffi/src/dtype.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vortex-ffi/src/dtype.rs b/vortex-ffi/src/dtype.rs index fcd78776da..449b34eb29 100644 --- a/vortex-ffi/src/dtype.rs +++ b/vortex-ffi/src/dtype.rs @@ -68,7 +68,8 @@ pub unsafe extern "C" fn DType_new_struct( for i in 0..len { let name_ptr = *names.add(i as usize); let name: Arc = CStr::from_ptr(name_ptr).to_string_lossy().into(); - let dtype = (**dtypes.add(i as usize)).clone(); + let dtype = Box::from_raw(*dtypes.add(i as usize)); + let dtype = *dtype; rust_names.push(name); field_dtypes.push(dtype); @@ -301,8 +302,6 @@ mod tests { let dtypes = [name, age]; let person = DType_new_struct(names.as_ptr(), dtypes.as_ptr(), 2, false); - DType_free(name); - DType_free(age); assert_eq!(DType_get(person), DTYPE_STRUCT); assert_eq!(DType_field_count(person), 2); From dc698c9b3fbbc1c50c63c44ff2760cc58223da77 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Mon, 10 Mar 2025 16:46:06 -0400 Subject: [PATCH 3/4] expose Array is_invalid over FFI --- .../java/dev/vortex/impl/NativeArray.java | 4 +- .../src/main/java/dev/vortex/jni/FFI.java | 2 + .../src/test/java/dev/vortex/jni/FFITest.java | 72 ------------------- .../spark/read/VortexPartitionReader.java | 9 ++- .../java/dev/vortex/spark/VortexScanTest.java | 2 + vortex-ffi/src/array.rs | 9 +++ 6 files changed, 19 insertions(+), 79 deletions(-) delete mode 100644 java/vortex-jni/src/test/java/dev/vortex/jni/FFITest.java diff --git a/java/vortex-jni/src/main/java/dev/vortex/impl/NativeArray.java b/java/vortex-jni/src/main/java/dev/vortex/impl/NativeArray.java index 7ceb731ea0..670784fc8f 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/impl/NativeArray.java +++ b/java/vortex-jni/src/main/java/dev/vortex/impl/NativeArray.java @@ -84,8 +84,8 @@ public Array slice(int start, int stop) { @Override public boolean getNull(int index) { - // check validity of the array - return false; + checkNotNull(inner, "inner"); + return FFI.FFIArray_is_null(inner, index); } @Override diff --git a/java/vortex-jni/src/main/java/dev/vortex/jni/FFI.java b/java/vortex-jni/src/main/java/dev/vortex/jni/FFI.java index 5182cab4dc..b1fe0559eb 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/jni/FFI.java +++ b/java/vortex-jni/src/main/java/dev/vortex/jni/FFI.java @@ -37,6 +37,8 @@ public final class FFI { public static native FFIArray FFIArray_slice(FFIArray array, int start, int stop); + public static native boolean FFIArray_is_null(FFIArray array, int index); + public static native FFIArray FFIArray_get_field(FFIArray array, int index); public static native byte FFIArray_get_u8(FFIArray array, int index); diff --git a/java/vortex-jni/src/test/java/dev/vortex/jni/FFITest.java b/java/vortex-jni/src/test/java/dev/vortex/jni/FFITest.java deleted file mode 100644 index 8283fdd8ea..0000000000 --- a/java/vortex-jni/src/test/java/dev/vortex/jni/FFITest.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * (c) Copyright 2025 SpiralDB Inc. All rights reserved. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.vortex.jni; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import dev.vortex.api.DType; -import dev.vortex.api.ScanOptions; -import dev.vortex.impl.NativeFile; -import java.nio.file.Path; -import java.nio.file.Paths; -import org.junit.jupiter.api.Test; - -public final class FFITest { - private static final Path LINEITEM = Paths.get(".") - .toAbsolutePath() - .getParent() - .getParent() - .getParent() - .resolve("bench-vortex/data/tpch/1/vortex_compressed/lineitem.vortex") - .toAbsolutePath(); - - @Test - public void testDType() { - // Provide a simple test for DType checking. - try (NativeFile lineitem = NativeFile.open(LINEITEM.toString())) { - try (DType dtype = lineitem.getDType()) { - System.out.println("dtype: " + dtype); - } - } - } - - @Test - public void testScan() { - var path = Paths.get(".") - .toAbsolutePath() - .getParent() - .getParent() - .getParent() - .resolve("bench-vortex/data/tpch/1/vortex_compressed/lineitem.vortex") - .toAbsolutePath() - .toString(); - - long rowCount = 0; - try (var file = NativeFile.open(path); - var scan = file.newScan(ScanOptions.of())) { - - while (scan.next()) { - try (var array = scan.getCurrent()) { - rowCount += array.getLen(); - } - } - } catch (Exception e) { - throw new RuntimeException("Failed closing resources", e); - } - - assertEquals(6001215L, rowCount); - } -} diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java index b6330727ea..2be3b1a7bd 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java @@ -15,20 +15,19 @@ */ package dev.vortex.spark.read; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import dev.vortex.api.Array; import dev.vortex.api.ArrayStream; import dev.vortex.api.File; import dev.vortex.api.ScanOptions; import dev.vortex.impl.NativeFile; import dev.vortex.spark.VortexFilePartition; +import java.util.Objects; import org.apache.spark.sql.connector.read.PartitionReader; import org.apache.spark.sql.vectorized.ColumnarBatch; -import java.util.Objects; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - /** * A {@link PartitionReader} that reads columnar batches out of a Vortex file into * Vortex memory format. diff --git a/java/vortex-spark/src/test/java/dev/vortex/spark/VortexScanTest.java b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexScanTest.java index 7e906b52c6..05064ce0bb 100644 --- a/java/vortex-spark/src/test/java/dev/vortex/spark/VortexScanTest.java +++ b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexScanTest.java @@ -19,8 +19,10 @@ import java.nio.file.Path; import java.nio.file.Paths; import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +@Disabled final class VortexScanTest { private static final Path TPCH_ROOT = Paths.get("/Volumes/Code/vortex/bench-vortex/data/tpch/1/vortex_compressed"); diff --git a/vortex-ffi/src/array.rs b/vortex-ffi/src/array.rs index 05c641d706..5525e093fe 100644 --- a/vortex-ffi/src/array.rs +++ b/vortex-ffi/src/array.rs @@ -82,6 +82,15 @@ pub unsafe extern "C" fn FFIArray_slice( Box::into_raw(Box::new(FFIArray { inner: sliced })) } +#[unsafe(no_mangle)] +pub unsafe extern "C" fn FFIArray_is_null(array: *const FFIArray, index: u32) -> bool { + let array = &*array; + array + .inner + .is_invalid(index as usize) + .vortex_expect("FFIArray_is_null: is_invalid") +} + macro_rules! ffiarray_get_ptype { ($ptype:ident) => { paste::paste! { From 40b9fae4b1efccab7469141c4b58f8a23709e68f Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Mar 2025 16:24:44 -0400 Subject: [PATCH 4/4] add: FFIArray_null_count + java binding --- .../src/main/java/dev/vortex/api/Array.java | 2 ++ .../src/main/java/dev/vortex/impl/NativeArray.java | 6 ++++++ .../vortex-jni/src/main/java/dev/vortex/jni/FFI.java | 2 ++ vortex-ffi/src/array.rs | 12 ++++++++++++ 4 files changed, 22 insertions(+) diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/Array.java b/java/vortex-jni/src/main/java/dev/vortex/api/Array.java index 2cc5ac808d..343eb19ae4 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/Array.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/Array.java @@ -26,6 +26,8 @@ public interface Array extends AutoCloseable { boolean getNull(int index); + int getNullCount(); + byte getByte(int index); short getShort(int index); diff --git a/java/vortex-jni/src/main/java/dev/vortex/impl/NativeArray.java b/java/vortex-jni/src/main/java/dev/vortex/impl/NativeArray.java index 670784fc8f..72d6965449 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/impl/NativeArray.java +++ b/java/vortex-jni/src/main/java/dev/vortex/impl/NativeArray.java @@ -88,6 +88,12 @@ public boolean getNull(int index) { return FFI.FFIArray_is_null(inner, index); } + @Override + public int getNullCount() { + checkNotNull(inner, "inner"); + return FFI.FFIArray_null_count(inner); + } + @Override public byte getByte(int index) { checkNotNull(inner, "inner"); diff --git a/java/vortex-jni/src/main/java/dev/vortex/jni/FFI.java b/java/vortex-jni/src/main/java/dev/vortex/jni/FFI.java index b1fe0559eb..994053f254 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/jni/FFI.java +++ b/java/vortex-jni/src/main/java/dev/vortex/jni/FFI.java @@ -39,6 +39,8 @@ public final class FFI { public static native boolean FFIArray_is_null(FFIArray array, int index); + public static native int FFIArray_null_count(FFIArray array); + public static native FFIArray FFIArray_get_field(FFIArray array, int index); public static native byte FFIArray_get_u8(FFIArray array, int index); diff --git a/vortex-ffi/src/array.rs b/vortex-ffi/src/array.rs index 5525e093fe..f1fc71131f 100644 --- a/vortex-ffi/src/array.rs +++ b/vortex-ffi/src/array.rs @@ -91,6 +91,18 @@ pub unsafe extern "C" fn FFIArray_is_null(array: *const FFIArray, index: u32) -> .vortex_expect("FFIArray_is_null: is_invalid") } +#[unsafe(no_mangle)] +pub unsafe extern "C" fn FFIArray_null_count(array: *const FFIArray) -> u32 { + let array = &*array; + array + .inner + .as_ref() + .invalid_count() + .vortex_expect("FFIArray_null_count: invalid count") + .try_into() + .vortex_expect("FFIArray_null_count: invalid count to u32") +} + macro_rules! ffiarray_get_ptype { ($ptype:ident) => { paste::paste! {