Skip to content

Commit 291d8f1

Browse files
committed
Add support for LZ4 compression
1 parent ea6d205 commit 291d8f1

30 files changed

+326
-236
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -272,18 +272,17 @@ object CometConf extends ShimCometConf {
272272
.booleanConf
273273
.createWithDefault(false)
274274

275-
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
276-
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
277-
.doc(
278-
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " +
279-
"Compression can be disabled by setting spark.shuffle.compress=false.")
280-
.stringConf
281-
.checkValues(Set("zstd"))
282-
.createWithDefault("zstd")
275+
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
276+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
277+
.doc("The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd " +
278+
"are supported. Compression can be disabled by setting spark.shuffle.compress=false.")
279+
.stringConf
280+
.checkValues(Set("zstd", "lz4"))
281+
.createWithDefault("lz4")
283282

284283
val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
285284
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
286-
.doc("The compression level to use when compression shuffle files.")
285+
.doc("The compression level to use when compressing shuffle files with zstd.")
287286
.intConf
288287
.createWithDefault(1)
289288

common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/IpcInputStreamIterator.scala

Lines changed: 0 additions & 129 deletions
This file was deleted.

docs/source/user-guide/configs.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ Comet provides the following configuration settings.
5050
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
5151
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
5252
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
53-
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd |
54-
| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 |
53+
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
54+
| spark.comet.exec.shuffle.compression.level | The compression level to use when compressing shuffle files with zstd. | 1 |
5555
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
5656
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
5757
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |

native/Cargo.lock

Lines changed: 1 addition & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jni = "0.21"
5555
snap = "1.1"
5656
brotli = "3.3"
5757
flate2 = "1.0"
58-
lz4 = "1.24"
58+
lz4_flex = "0.11.3"
5959
zstd = "0.11"
6060
rand = { workspace = true}
6161
num = { workspace = true }

native/core/src/execution/jni_api.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Define JNI APIs which can be called from Java/Scala.
1919
20+
use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
2021
use arrow::datatypes::DataType as ArrowDataType;
2122
use arrow_array::RecordBatch;
2223
use datafusion::{
@@ -40,8 +41,6 @@ use jni::{
4041
use std::time::{Duration, Instant};
4142
use std::{collections::HashMap, sync::Arc, task::Poll};
4243

43-
use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
44-
4544
use crate::{
4645
errors::{try_unwrap_or_throw, CometError, CometResult},
4746
execution::{
@@ -60,6 +59,7 @@ use jni::{
6059
use tokio::runtime::Runtime;
6160

6261
use crate::execution::operators::ScanExec;
62+
use crate::execution::shuffle::read_ipc_compressed;
6363
use crate::execution::spark_plan::SparkPlan;
6464
use log::info;
6565

@@ -95,7 +95,7 @@ struct ExecutionContext {
9595

9696
/// Accept serialized query plan and return the address of the native query plan.
9797
/// # Safety
98-
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
98+
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
9999
#[no_mangle]
100100
pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
101101
e: JNIEnv,
@@ -231,7 +231,7 @@ fn prepare_output(
231231
array_addrs: jlongArray,
232232
schema_addrs: jlongArray,
233233
output_batch: RecordBatch,
234-
exec_context: &mut ExecutionContext,
234+
debug_native: bool,
235235
) -> CometResult<jlong> {
236236
let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
237237
let num_cols = env.get_array_length(&array_address_array)? as usize;
@@ -255,7 +255,7 @@ fn prepare_output(
255255
)));
256256
}
257257

258-
if exec_context.debug_native {
258+
if debug_native {
259259
// Validate the output arrays.
260260
for array in results.iter() {
261261
let array_data = array.to_data();
@@ -275,9 +275,6 @@ fn prepare_output(
275275
i += 1;
276276
}
277277

278-
// Update metrics
279-
update_metrics(env, exec_context)?;
280-
281278
Ok(num_rows as jlong)
282279
}
283280

@@ -298,7 +295,7 @@ fn pull_input_batches(exec_context: &mut ExecutionContext) -> Result<(), CometEr
298295
/// Accept serialized query plan and the addresses of Arrow Arrays from Spark,
299296
/// then execute the query. Return addresses of arrow vector.
300297
/// # Safety
301-
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
298+
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
302299
#[no_mangle]
303300
pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
304301
e: JNIEnv,
@@ -358,12 +355,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
358355

359356
match poll_output {
360357
Poll::Ready(Some(output)) => {
358+
// Update metrics
359+
update_metrics(&mut env, exec_context)?;
361360
return prepare_output(
362361
&mut env,
363362
array_addrs,
364363
schema_addrs,
365364
output?,
366-
exec_context,
365+
exec_context.debug_native,
367366
);
368367
}
369368
Poll::Ready(None) => {
@@ -459,7 +458,7 @@ fn get_execution_context<'a>(id: i64) -> &'a mut ExecutionContext {
459458

460459
/// Used by Comet shuffle external sorter to write sorted records to disk.
461460
/// # Safety
462-
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
461+
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
463462
#[no_mangle]
464463
pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative(
465464
e: JNIEnv,
@@ -544,3 +543,25 @@ pub extern "system" fn Java_org_apache_comet_Native_sortRowPartitionsNative(
544543
Ok(())
545544
})
546545
}
546+
547+
#[no_mangle]
548+
/// Used by Comet native shuffle reader
549+
/// # Safety
550+
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
551+
pub unsafe extern "system" fn Java_org_apache_comet_Native_decodeShuffleBlock(
552+
e: JNIEnv,
553+
_class: JClass,
554+
byte_array: jbyteArray,
555+
array_addrs: jlongArray,
556+
schema_addrs: jlongArray,
557+
) -> jlong {
558+
try_unwrap_or_throw(&e, |mut env| {
559+
let value_array = unsafe { JPrimitiveArray::from_raw(byte_array) };
560+
let length = env.get_array_length(&value_array)?;
561+
let elements = unsafe { env.get_array_elements(&value_array, ReleaseMode::NoCopyBack)? };
562+
let raw_pointer = elements.as_ptr();
563+
let slice = unsafe { std::slice::from_raw_parts(raw_pointer, length as usize) };
564+
let batch = read_ipc_compressed(slice)?;
565+
prepare_output(&mut env, array_addrs, schema_addrs, batch, false)
566+
})
567+
}

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,6 +1055,7 @@ impl PhysicalPlanner {
10551055
Ok(SparkCompressionCodec::Zstd) => {
10561056
Ok(CompressionCodec::Zstd(writer.compression_level))
10571057
}
1058+
Ok(SparkCompressionCodec::Lz4) => Ok(CompressionCodec::Lz4Frame),
10581059
_ => Err(ExecutionError::GeneralError(format!(
10591060
"Unsupported shuffle compression codec: {:?}",
10601061
writer.codec

native/core/src/execution/shuffle/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ mod list;
1919
mod map;
2020
pub mod row;
2121
mod shuffle_writer;
22-
pub use shuffle_writer::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec};
22+
pub use shuffle_writer::{
23+
read_ipc_compressed, write_ipc_compressed, CompressionCodec, ShuffleWriterExec,
24+
};

0 commit comments

Comments
 (0)