Skip to content

Commit 0e40460

Browse files
HawaiianSporkalamb
andauthored
fix: Panic on reencoding offsets in arrow-ipc with sliced nested arrays (#6998)
* fix: Panic on reencoding offsets Code was incorrectly defining the end of the offset slice to be the start + slice_length * 2 because slice_with_length adds the start to the end. This caused the encoded batches to be larger than they needed to be and would result in a panic for certain slices. * Add tests for slicing larger arrays * Run rustfmt * Added end to end unit test which shows the problem is fixed. --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 8508063 commit 0e40460

File tree

1 file changed

+156
-5
lines changed

1 file changed

+156
-5
lines changed

arrow-ipc/src/writer.rs

Lines changed: 156 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,10 +1520,7 @@ fn reencode_offsets<O: OffsetSizeTrait>(
15201520
let offsets = match start_offset.as_usize() {
15211521
0 => {
15221522
let size = size_of::<O>();
1523-
offsets.slice_with_length(
1524-
data.offset() * size,
1525-
(data.offset() + data.len() + 1) * size,
1526-
)
1523+
offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
15271524
}
15281525
_ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
15291526
};
@@ -1840,9 +1837,9 @@ mod tests {
18401837
use std::io::Cursor;
18411838
use std::io::Seek;
18421839

1843-
use arrow_array::builder::GenericListBuilder;
18441840
use arrow_array::builder::MapBuilder;
18451841
use arrow_array::builder::UnionBuilder;
1842+
use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
18461843
use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
18471844
use arrow_array::types::*;
18481845
use arrow_buffer::ScalarBuffer;
@@ -2480,6 +2477,126 @@ mod tests {
24802477
);
24812478
}
24822479

2480+
#[test]
2481+
fn test_large_slice_uint32() {
2482+
ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2483+
if i % 2 == 0 {
2484+
Some(i)
2485+
} else {
2486+
None
2487+
}
2488+
}))));
2489+
}
2490+
2491+
#[test]
2492+
fn test_large_slice_string() {
2493+
let strings: Vec<_> = (0..8000)
2494+
.map(|i| {
2495+
if i % 2 == 0 {
2496+
Some(format!("value{}", i))
2497+
} else {
2498+
None
2499+
}
2500+
})
2501+
.collect();
2502+
2503+
ensure_roundtrip(Arc::new(StringArray::from(strings)));
2504+
}
2505+
2506+
#[test]
2507+
fn test_large_slice_string_list() {
2508+
let mut ls = ListBuilder::new(StringBuilder::new());
2509+
2510+
let mut s = String::new();
2511+
for row_number in 0..8000 {
2512+
if row_number % 2 == 0 {
2513+
for list_element in 0..1000 {
2514+
s.clear();
2515+
use std::fmt::Write;
2516+
write!(&mut s, "value{row_number}-{list_element}").unwrap();
2517+
ls.values().append_value(&s);
2518+
}
2519+
ls.append(true)
2520+
} else {
2521+
ls.append(false); // null
2522+
}
2523+
}
2524+
2525+
ensure_roundtrip(Arc::new(ls.finish()));
2526+
}
2527+
2528+
#[test]
2529+
fn test_large_slice_string_list_of_lists() {
2530+
// The reason for the special test is to verify reencode_offsets which looks both at
2531+
// the starting offset and the data offset. So need a dataset where the starting_offset
2532+
// is zero but the data offset is not.
2533+
let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2534+
2535+
for _ in 0..4000 {
2536+
ls.values().append(true);
2537+
ls.append(true)
2538+
}
2539+
2540+
let mut s = String::new();
2541+
for row_number in 0..4000 {
2542+
if row_number % 2 == 0 {
2543+
for list_element in 0..1000 {
2544+
s.clear();
2545+
use std::fmt::Write;
2546+
write!(&mut s, "value{row_number}-{list_element}").unwrap();
2547+
ls.values().values().append_value(&s);
2548+
}
2549+
ls.values().append(true);
2550+
ls.append(true)
2551+
} else {
2552+
ls.append(false); // null
2553+
}
2554+
}
2555+
2556+
ensure_roundtrip(Arc::new(ls.finish()));
2557+
}
2558+
2559+
/// Read/write a record batch to a File and Stream and ensure it is the same at the outout
2560+
fn ensure_roundtrip(array: ArrayRef) {
2561+
let num_rows = array.len();
2562+
let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2563+
// take off the first element
2564+
let sliced_batch = orig_batch.slice(1, num_rows - 1);
2565+
2566+
let schema = orig_batch.schema();
2567+
let stream_data = {
2568+
let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2569+
writer.write(&sliced_batch).unwrap();
2570+
writer.into_inner().unwrap()
2571+
};
2572+
let read_batch = {
2573+
let projection = None;
2574+
let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2575+
reader
2576+
.next()
2577+
.expect("expect no errors reading batch")
2578+
.expect("expect batch")
2579+
};
2580+
assert_eq!(sliced_batch, read_batch);
2581+
2582+
let file_data = {
2583+
let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2584+
writer.write(&sliced_batch).unwrap();
2585+
writer.into_inner().unwrap().into_inner().unwrap()
2586+
};
2587+
let read_batch = {
2588+
let projection = None;
2589+
let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2590+
reader
2591+
.next()
2592+
.expect("expect no errors reading batch")
2593+
.expect("expect batch")
2594+
};
2595+
assert_eq!(sliced_batch, read_batch);
2596+
2597+
// TODO test file writer/reader
2598+
}
2599+
24832600
#[test]
24842601
fn encode_bools_slice() {
24852602
// Test case for https://github.com/apache/arrow-rs/issues/3496
@@ -2662,6 +2779,40 @@ mod tests {
26622779
builder.finish()
26632780
}
26642781

2782+
#[test]
2783+
fn reencode_offsets_when_first_offset_is_not_zero() {
2784+
let original_list = generate_list_data::<i32>();
2785+
let original_data = original_list.into_data();
2786+
let slice_data = original_data.slice(75, 7);
2787+
let (new_offsets, original_start, length) =
2788+
reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2789+
assert_eq!(
2790+
vec![0, 3, 6, 9, 12, 15, 18, 21],
2791+
new_offsets.typed_data::<i32>()
2792+
);
2793+
assert_eq!(225, original_start);
2794+
assert_eq!(21, length);
2795+
}
2796+
2797+
#[test]
2798+
fn reencode_offsets_when_first_offset_is_zero() {
2799+
let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2800+
// ls = [[], [35, 42]
2801+
ls.append(true);
2802+
ls.values().append_value(35);
2803+
ls.values().append_value(42);
2804+
ls.append(true);
2805+
let original_list = ls.finish();
2806+
let original_data = original_list.into_data();
2807+
2808+
let slice_data = original_data.slice(1, 1);
2809+
let (new_offsets, original_start, length) =
2810+
reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2811+
assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2812+
assert_eq!(0, original_start);
2813+
assert_eq!(2, length);
2814+
}
2815+
26652816
/// Ensure when serde full & sliced versions they are equal to original input.
26662817
/// Also ensure serialized sliced version is significantly smaller than serialized full.
26672818
fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {

0 commit comments

Comments
 (0)