Skip to content

Commit c15692d

Browse files
committed
switch to lzf_flex crate
1 parent 4ce6d37 commit c15692d

File tree

4 files changed

+40
-40
lines changed

4 files changed

+40
-40
lines changed

native/Cargo.lock

Lines changed: 1 addition & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jni = "0.21"
5555
snap = "1.1"
5656
brotli = "3.3"
5757
flate2 = "1.0"
58-
lz4 = "1.24"
58+
lz4_flex = "0.11.3"
5959
zstd = "0.11"
6060
rand = { workspace = true}
6161
num = { workspace = true }

native/core/benches/shuffle_writer.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ fn criterion_benchmark(c: &mut Criterion) {
6363
let mut cursor = Cursor::new(&mut buffer);
6464
let ipc_time = Time::default();
6565
b.iter(|| {
66-
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Lz4Frame, &ipc_time)
66+
write_ipc_compressed(
67+
&batch,
68+
&mut cursor,
69+
&CompressionCodec::Lz4Frame(0),
70+
&ipc_time,
71+
)
6772
});
6873
});
6974
group.bench_function("shuffle_writer: encode and compress (lz4 block)", |b| {
@@ -77,7 +82,6 @@ fn criterion_benchmark(c: &mut Criterion) {
7782
});
7883
group.bench_function("shuffle_writer: end to end", |b| {
7984
let ctx = SessionContext::new();
80-
let batch = create_batch(8192, true);
8185
let exec = create_shuffle_writer_exec(CompressionCodec::Zstd(1));
8286
b.iter(|| {
8387
let task_ctx = ctx.task_ctx();
@@ -90,9 +94,10 @@ fn criterion_benchmark(c: &mut Criterion) {
9094

9195
fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWriterExec {
9296
let batches = create_batches(8192, 10);
97+
let schema = batches[0].schema();
9398
let partitions = &[batches];
9499
let exec = ShuffleWriterExec::try_new(
95-
Arc::new(MemoryExec::try_new(partitions, batches[0].schema(), None).unwrap()),
100+
Arc::new(MemoryExec::try_new(partitions, schema, None).unwrap()),
96101
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
97102
compression_codec,
98103
"/tmp/data.out".to_string(),

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use datafusion_physical_expr::EquivalenceProperties;
5353
use futures::executor::block_on;
5454
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
5555
use itertools::Itertools;
56+
use lz4_flex::compress_prepend_size;
5657
use simd_adler32::Adler32;
5758
use std::io::Error;
5859
use std::{
@@ -1603,32 +1604,45 @@ pub fn write_ipc_compressed<W: Write + Seek>(
16031604
arrow_writer.finish()?;
16041605
let ipc_encoded = arrow_writer.into_inner()?;
16051606

1606-
// TODO refactor to compress directly to output buffer and avoid a copy
1607-
let compressed = lz4::block::compress(ipc_encoded, None, true)?;
1607+
let compressed = compress_prepend_size(ipc_encoded);
16081608
output.write_all(&compressed)?;
1609-
16101609
output
1610+
1611+
// lz4 crate impl
1612+
// // TODO refactor to compress directly to output buffer and avoid a copy
1613+
// let compressed = lz4::block::compress(ipc_encoded, None, true)?;
1614+
// output.write_all(&compressed)?;
1615+
//
1616+
// output
16111617
}
1612-
CompressionCodec::Lz4Frame(level) => {
1618+
CompressionCodec::Lz4Frame(_level) => {
16131619
// write IPC first without compression
16141620
let mut buffer = vec![];
16151621
let mut arrow_writer = StreamWriter::try_new(&mut buffer, &batch.schema())?;
16161622
arrow_writer.write(batch)?;
16171623
arrow_writer.finish()?;
16181624
let ipc_encoded = arrow_writer.into_inner()?;
16191625

1620-
let mut encoder = lz4::EncoderBuilder::new()
1621-
.content_size(ipc_encoded.len() as u64)
1622-
// .block_mode(lz4::BlockMode::Independent)
1623-
// .block_size(BlockSize::Default)
1624-
// .checksum(ContentChecksum::NoChecksum)
1625-
// .block_checksum(BlockChecksum::BlockChecksumEnabled)
1626-
.level(*level as u32)
1627-
.build(&mut *output)?;
1628-
encoder.write_all(ipc_encoded.as_slice())?;
1629-
let (output, result) = encoder.finish();
1630-
result?;
1631-
output
1626+
// compress
1627+
let mut reader = Cursor::new(ipc_encoded);
1628+
let mut wtr = lz4_flex::frame::FrameEncoder::new(output);
1629+
std::io::copy(&mut reader, &mut wtr).expect("I/O operation failed");
1630+
wtr.finish().unwrap()
1631+
1632+
1633+
// lz4 crate version
1634+
// let mut encoder = lz4::EncoderBuilder::new()
1635+
// .content_size(ipc_encoded.len() as u64)
1636+
// // .block_mode(lz4::BlockMode::Independent)
1637+
// // .block_size(BlockSize::Default)
1638+
// // .checksum(ContentChecksum::NoChecksum)
1639+
// // .block_checksum(BlockChecksum::BlockChecksumEnabled)
1640+
// .level(*level as u32)
1641+
// .build(&mut *output)?;
1642+
// encoder.write_all(ipc_encoded.as_slice())?;
1643+
// let (output, result) = encoder.finish();
1644+
// result?;
1645+
// output
16321646
}
16331647
CompressionCodec::Zstd(level) => {
16341648
let encoder = zstd::Encoder::new(output, *level)?;

0 commit comments

Comments
 (0)