Skip to content

Commit ee16437

Browse files
committed
Add support for reading parquet file thanks to arrow-dataset #576
1 parent 62b4894 commit ee16437

File tree

8 files changed

+219
-13
lines changed

8 files changed

+219
-13
lines changed

dataframe-arrow/api/dataframe-arrow.api

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ public final class org/jetbrains/kotlinx/dataframe/io/ArrowReadingKt {
3535
public static synthetic fun readArrowIPC$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;Ljava/net/URL;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
3636
public static synthetic fun readArrowIPC$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;Ljava/nio/channels/ReadableByteChannel;Lorg/apache/arrow/memory/RootAllocator;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
3737
public static synthetic fun readArrowIPC$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[BLorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
38+
public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/io/File;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;J)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
39+
public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/lang/String;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;J)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
40+
public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/net/URL;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;J)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
41+
public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/nio/file/Path;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;J)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
42+
public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/io/File;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;JILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
43+
public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/lang/String;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;JILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
44+
public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/net/URL;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;JILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
45+
public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/nio/file/Path;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;JILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
3846
public static final fun toDataFrame (Lorg/apache/arrow/vector/ipc/ArrowReader;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
3947
public static synthetic fun toDataFrame$default (Lorg/apache/arrow/vector/ipc/ArrowReader;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
4048
}

dataframe-arrow/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ dependencies {
2020
implementation(libs.arrow.vector)
2121
implementation(libs.arrow.format)
2222
implementation(libs.arrow.memory)
23+
implementation(libs.arrow.dataset)
2324
implementation(libs.commonsCompress)
2425
implementation(libs.kotlin.reflect)
2526
implementation(libs.kotlin.datetimeJvm)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

3+
import org.apache.arrow.dataset.file.FileFormat
34
import org.apache.arrow.memory.RootAllocator
45
import org.apache.arrow.vector.ipc.ArrowReader
56
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
@@ -16,6 +17,7 @@ import java.nio.channels.Channels
1617
import java.nio.channels.ReadableByteChannel
1718
import java.nio.channels.SeekableByteChannel
1819
import java.nio.file.Files
20+
import java.nio.file.Path
1921

2022
public class ArrowFeather : SupportedDataFrameFormat {
2123
override fun readDataFrame(stream: InputStream, header: List<String>): AnyFrame =
@@ -36,6 +38,8 @@ public class ArrowFeather : SupportedDataFrameFormat {
3638

3739
private const val READ_ARROW_FEATHER = "readArrowFeather"
3840

41+
internal const val ARROW_PARQUET_DEFAULT_BATCH_SIZE = 32768L
42+
3943
private class DefaultReadArrowMethod(path: String?) :
4044
AbstractDefaultReadMethod(path, MethodArguments.EMPTY, READ_ARROW_FEATHER)
4145

@@ -185,3 +189,55 @@ public fun DataFrame.Companion.readArrow(
185189
*/
186190
public fun ArrowReader.toDataFrame(nullability: NullabilityOptions = NullabilityOptions.Infer): AnyFrame =
187191
DataFrame.Companion.readArrowImpl(this, nullability)
192+
193+
/**
194+
* Read [Parquet](https://parquet.apache.org/) data from existing [urls] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
195+
*/
196+
public fun DataFrame.Companion.readParquet(
197+
vararg urls: URL,
198+
nullability: NullabilityOptions = NullabilityOptions.Infer,
199+
batchSize: Long = 32768,
200+
): AnyFrame =
201+
readArrowDatasetImpl(
202+
urls.map {
203+
it.toString()
204+
}.toTypedArray(),
205+
FileFormat.PARQUET,
206+
nullability,
207+
batchSize,
208+
)
209+
210+
/**
211+
* Read [Parquet](https://parquet.apache.org/) data from existing [strUrls] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
212+
*/
213+
public fun DataFrame.Companion.readParquet(
214+
vararg strUrls: String,
215+
nullability: NullabilityOptions = NullabilityOptions.Infer,
216+
batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE,
217+
): AnyFrame = readArrowDatasetImpl(arrayOf(*strUrls), FileFormat.PARQUET, nullability, batchSize)
218+
219+
/**
220+
* Read [Parquet](https://parquet.apache.org/) data from existing [paths] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
221+
*/
222+
public fun DataFrame.Companion.readParquet(
223+
vararg paths: Path,
224+
nullability: NullabilityOptions = NullabilityOptions.Infer,
225+
batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE,
226+
): AnyFrame = readArrowDatasetImpl(paths.map { "file:$it" }.toTypedArray(), FileFormat.PARQUET, nullability, batchSize)
227+
228+
/**
229+
* Read [Parquet](https://parquet.apache.org/) data from existing [files] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
230+
*/
231+
public fun DataFrame.Companion.readParquet(
232+
vararg files: File,
233+
nullability: NullabilityOptions = NullabilityOptions.Infer,
234+
batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE,
235+
): AnyFrame =
236+
readArrowDatasetImpl(
237+
files.map {
238+
"file:${it.toPath()}"
239+
}.toTypedArray(),
240+
FileFormat.PARQUET,
241+
nullability,
242+
batchSize,
243+
)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import kotlinx.datetime.LocalTime
66
import kotlinx.datetime.toKotlinLocalDate
77
import kotlinx.datetime.toKotlinLocalDateTime
88
import kotlinx.datetime.toKotlinLocalTime
9+
import org.apache.arrow.dataset.file.FileFormat
10+
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
11+
import org.apache.arrow.dataset.jni.DirectReservationListener
12+
import org.apache.arrow.dataset.jni.NativeMemoryPool
13+
import org.apache.arrow.dataset.scanner.ScanOptions
914
import org.apache.arrow.memory.RootAllocator
1015
import org.apache.arrow.vector.BigIntVector
1116
import org.apache.arrow.vector.BitVector
@@ -59,10 +64,13 @@ import org.jetbrains.kotlinx.dataframe.api.emptyDataFrame
5964
import org.jetbrains.kotlinx.dataframe.api.getColumn
6065
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
6166
import org.jetbrains.kotlinx.dataframe.impl.asList
67+
import java.io.File
6268
import java.math.BigDecimal
6369
import java.math.BigInteger
70+
import java.net.URI
6471
import java.nio.channels.ReadableByteChannel
6572
import java.nio.channels.SeekableByteChannel
73+
import java.nio.file.Files
6674
import kotlin.reflect.KType
6775
import kotlin.reflect.full.withNullability
6876
import kotlin.reflect.typeOf
@@ -414,3 +422,52 @@ internal fun DataFrame.Companion.readArrowImpl(
414422
return flattened.concatKeepingSchema()
415423
}
416424
}
425+
426+
private fun resolveArrowDatasetUris(fileUris: Array<String>): Array<String> =
427+
fileUris.map {
428+
when {
429+
it.startsWith("http:", true) -> {
430+
val url = URI.create(it).toURL()
431+
val tempFile = File.createTempFile("kdf", ".parquet")
432+
tempFile.deleteOnExit()
433+
url.openStream().use { input ->
434+
Files.copy(input, tempFile.toPath())
435+
"file:${tempFile.toPath()}"
436+
}
437+
}
438+
439+
!it.startsWith("file:", true) && File(it).exists() -> {
440+
"file:$it"
441+
}
442+
443+
else -> it
444+
}
445+
}.toTypedArray()
446+
447+
/**
448+
* Read [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) from [fileUris]
449+
*/
450+
internal fun DataFrame.Companion.readArrowDatasetImpl(
451+
fileUris: Array<String>,
452+
fileFormat: FileFormat,
453+
nullability: NullabilityOptions = NullabilityOptions.Infer,
454+
batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE,
455+
): AnyFrame {
456+
val scanOptions = ScanOptions(batchSize)
457+
RootAllocator().use { allocator ->
458+
FileSystemDatasetFactory(
459+
allocator,
460+
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
461+
fileFormat,
462+
resolveArrowDatasetUris(fileUris),
463+
).use { datasetFactory ->
464+
datasetFactory.finish().use { dataset ->
465+
dataset.newScan(scanOptions).use { scanner ->
466+
scanner.scanBatches().use { reader ->
467+
return readArrowImpl(reader, nullability)
468+
}
469+
}
470+
}
471+
}
472+
}
473+
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ import org.junit.Test
4848
import java.io.ByteArrayInputStream
4949
import java.io.ByteArrayOutputStream
5050
import java.io.File
51+
import java.net.URI
5152
import java.net.URL
5253
import java.nio.channels.Channels
54+
import java.nio.file.FileSystems
5355
import java.sql.DriverManager
5456
import java.util.Locale
5557
import kotlin.reflect.typeOf
@@ -653,4 +655,69 @@ internal class ArrowKtTest {
653655
DataFrame.readArrow(dbArrowReader) shouldBe expected
654656
}
655657
}
658+
659+
@Test
660+
fun testReadParquetPath() {
661+
val resourceLocation = testResource("test.arrow.parquet").path
662+
val resourcePath = FileSystems.getDefault().getPath(resourceLocation)
663+
val dataFrame = DataFrame.readParquet(resourcePath)
664+
dataFrame.rowsCount() shouldBe 300
665+
assertEstimations(
666+
exampleFrame = dataFrame,
667+
expectedNullable = false,
668+
hasNulls = false,
669+
fromParquet = true,
670+
)
671+
}
672+
673+
@Test
674+
fun testReadParquetFile() {
675+
val resourceLocation = testResource("test.arrow.parquet").path
676+
val resourcePath = FileSystems.getDefault().getPath(resourceLocation)
677+
val dataFrame = DataFrame.readParquet(resourcePath.toFile())
678+
dataFrame.rowsCount() shouldBe 300
679+
assertEstimations(
680+
exampleFrame = dataFrame,
681+
expectedNullable = false,
682+
hasNulls = false,
683+
fromParquet = true,
684+
)
685+
}
686+
687+
@Test
688+
fun testReadParquetStringPath() {
689+
val resourceLocation = testResource("test.arrow.parquet").path
690+
val resourcePath = FileSystems.getDefault().getPath(resourceLocation)
691+
val dataFrame = DataFrame.readParquet("$resourcePath")
692+
dataFrame.rowsCount() shouldBe 300
693+
assertEstimations(
694+
exampleFrame = dataFrame,
695+
expectedNullable = false,
696+
hasNulls = false,
697+
fromParquet = true,
698+
)
699+
}
700+
701+
@Test
702+
fun testReadParquetUrl() {
703+
val resourceLocation = testResource("test.arrow.parquet").path
704+
val resourcePath = FileSystems.getDefault().getPath(resourceLocation)
705+
val fileUrl = URI.create("file:$resourcePath").toURL()
706+
val dataFrame = DataFrame.readParquet(fileUrl)
707+
dataFrame.rowsCount() shouldBe 300
708+
assertEstimations(
709+
exampleFrame = dataFrame,
710+
expectedNullable = false,
711+
hasNulls = false,
712+
fromParquet = true,
713+
)
714+
}
715+
716+
@Test
717+
fun testReadMultipleParquetFiles() {
718+
val resourceLocation = testResource("test.arrow.parquet").path
719+
val resourcePath = FileSystems.getDefault().getPath(resourceLocation)
720+
val dataFrame = DataFrame.readParquet(resourcePath, resourcePath, resourcePath)
721+
dataFrame.rowsCount() shouldBe 900
722+
}
656723
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@ import java.time.LocalTime as JavaLocalTime
2424
* Assert that we have got the same data that was originally saved on example creation.
2525
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
2626
*/
27-
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
27+
internal fun assertEstimations(
28+
exampleFrame: AnyFrame,
29+
expectedNullable: Boolean,
30+
hasNulls: Boolean,
31+
fromParquet: Boolean = false,
32+
) {
2833
/**
2934
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
3035
*/
@@ -142,16 +147,27 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
142147
assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate())
143148
}
144149

145-
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
146-
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
147-
datetimeCol.forEachIndexed { i, element ->
148-
assertValueOrNull(
149-
rowNumber = iBatch(i),
150-
actual = element,
151-
expected = JavaLocalDateTime
152-
.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC)
153-
.toKotlinLocalDateTime(),
154-
)
150+
if (fromParquet) {
151+
// parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
152+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
153+
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
154+
datetimeCol.forEachIndexed { i, element ->
155+
assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate())
156+
}
157+
} else {
158+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
159+
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
160+
datetimeCol.forEachIndexed { i, element ->
161+
assertValueOrNull(
162+
rowNumber = iBatch(i),
163+
actual = element,
164+
expected = JavaLocalDateTime.ofEpochSecond(
165+
iBatch(i).toLong() * 60 * 60 * 24 * 30,
166+
0,
167+
ZoneOffset.UTC,
168+
).toKotlinLocalDateTime(),
169+
)
170+
}
155171
}
156172

157173
val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Binary file not shown.

gradle/libs.versions.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ junit-platform = "1.11.3"
4545
kotestAsserions = "6.0.0.M1"
4646

4747
jsoup = "1.18.3"
48-
arrow = "18.1.0"
48+
arrow = "18.3.0"
4949
kodex = "0.4.4"
5050
simpleGit = "2.2.1"
5151
dependencyVersions = "0.51.0"
@@ -54,7 +54,7 @@ shadow = "8.3.5"
5454
android-gradle-api = "7.3.1" # need to revise our tests to update
5555
ktor = "3.0.1" # needs jupyter compatibility with Kotlin 2.1 to update
5656
kotlin-compile-testing = "1.6.0"
57-
duckdb = "1.1.3"
57+
duckdb = "1.2.2.0"
5858
buildconfig = "5.5.1"
5959
benchmark = "0.4.12"
6060

@@ -124,6 +124,7 @@ arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref
124124
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
125125
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
126126
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }
127+
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }
127128

128129
geotools-main = { module = "org.geotools:gt-main", version.ref = "geotools" }
129130
geotools-shapefile = { module = "org.geotools:gt-shapefile", version.ref = "geotools" }

0 commit comments

Comments
 (0)