Skip to content

Commit 5ffc0a8

Browse files
askoaask
and
ask
authored
fix: Handle sliced array in run array iterator (#3681)
* Handle sliced array in run array iterator * incorporate PR comments --------- Co-authored-by: ask <ask@local>
1 parent d011e6a commit 5ffc0a8

File tree

2 files changed

+88
-30
lines changed

2 files changed

+88
-30
lines changed

arrow-array/src/array/run_array.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,11 @@ impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
472472
pub fn values(&self) -> &'a V {
473473
self.values
474474
}
475+
476+
/// Returns the run array of this [`TypedRunArray`]
477+
pub fn run_array(&self) -> &'a RunArray<R> {
478+
self.run_array
479+
}
475480
}
476481

477482
impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> {

arrow-array/src/run_iterator.rs

Lines changed: 83 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ where
4242
<&'a V as ArrayAccessor>::Item: Default,
4343
{
4444
array: TypedRunArray<'a, R, V>,
45-
current_logical: usize,
46-
current_physical: usize,
47-
current_end_logical: usize,
48-
current_end_physical: usize,
45+
current_front_logical: usize,
46+
current_front_physical: usize,
47+
current_back_logical: usize,
48+
current_back_physical: usize,
4949
}
5050

5151
impl<'a, R, V> RunArrayIter<'a, R, V>
@@ -57,14 +57,19 @@ where
5757
{
5858
/// create a new iterator
5959
pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
60-
let logical_len = array.len();
61-
let physical_len: usize = array.values().len();
60+
let current_front_physical: usize =
61+
array.run_array().get_physical_index(0).unwrap();
62+
let current_back_physical: usize = array
63+
.run_array()
64+
.get_physical_index(array.len() - 1)
65+
.unwrap()
66+
+ 1;
6267
RunArrayIter {
6368
array,
64-
current_logical: 0,
65-
current_physical: 0,
66-
current_end_logical: logical_len,
67-
current_end_physical: physical_len,
69+
current_front_logical: array.offset(),
70+
current_front_physical,
71+
current_back_logical: array.offset() + array.len(),
72+
current_back_physical,
6873
}
6974
}
7075
}
@@ -80,44 +85,46 @@ where
8085

8186
#[inline]
8287
fn next(&mut self) -> Option<Self::Item> {
83-
if self.current_logical == self.current_end_logical {
88+
if self.current_front_logical == self.current_back_logical {
8489
return None;
8590
}
8691
// If current logical index is greater than current run end index then increment
8792
// the physical index.
88-
if self.current_logical
93+
if self.current_front_logical
8994
>= self
9095
.array
9196
.run_ends()
92-
.value(self.current_physical)
97+
.value(self.current_front_physical)
9398
.as_usize()
9499
{
95100
// As the run_ends is expected to be strictly increasing, there
96101
// should be at least one logical entry in one physical entry. Because of this
97102
// reason the next value can be accessed by incrementing physical index once.
98-
self.current_physical += 1;
103+
self.current_front_physical += 1;
99104
}
100-
if self.array.values().is_null(self.current_physical) {
101-
self.current_logical += 1;
105+
if self.array.values().is_null(self.current_front_physical) {
106+
self.current_front_logical += 1;
102107
Some(None)
103108
} else {
104-
self.current_logical += 1;
109+
self.current_front_logical += 1;
105110
// Safety:
106111
// The self.current_physical is kept within bounds of self.current_logical.
107112
// The self.current_logical will not go out of bounds because of the check
108113
// `self.current_logical = self.current_end_logical` above.
109114
unsafe {
110115
Some(Some(
111-
self.array.values().value_unchecked(self.current_physical),
116+
self.array
117+
.values()
118+
.value_unchecked(self.current_front_physical),
112119
))
113120
}
114121
}
115122
}
116123

117124
fn size_hint(&self) -> (usize, Option<usize>) {
118125
(
119-
self.current_end_logical - self.current_logical,
120-
Some(self.current_end_logical - self.current_logical),
126+
self.current_back_logical - self.current_front_logical,
127+
Some(self.current_back_logical - self.current_front_logical),
121128
)
122129
}
123130
}
@@ -130,26 +137,26 @@ where
130137
<&'a V as ArrayAccessor>::Item: Default,
131138
{
132139
fn next_back(&mut self) -> Option<Self::Item> {
133-
if self.current_end_logical == self.current_logical {
140+
if self.current_back_logical == self.current_front_logical {
134141
return None;
135142
}
136143

137-
self.current_end_logical -= 1;
144+
self.current_back_logical -= 1;
138145

139-
if self.current_end_physical > 0
140-
&& self.current_end_logical
146+
if self.current_back_physical > 0
147+
&& self.current_back_logical
141148
< self
142149
.array
143150
.run_ends()
144-
.value(self.current_end_physical - 1)
151+
.value(self.current_back_physical - 1)
145152
.as_usize()
146153
{
147154
// As the run_ends is expected to be strictly increasing, there
148155
// should be at least one logical entry in one physical entry. Because of this
149156
// reason the next value can be accessed by decrementing physical index once.
150-
self.current_end_physical -= 1;
157+
self.current_back_physical -= 1;
151158
}
152-
Some(if self.array.values().is_null(self.current_end_physical) {
159+
Some(if self.array.values().is_null(self.current_back_physical) {
153160
None
154161
} else {
155162
// Safety:
@@ -160,7 +167,7 @@ where
160167
Some(
161168
self.array
162169
.values()
163-
.value_unchecked(self.current_end_physical),
170+
.value_unchecked(self.current_back_physical),
164171
)
165172
}
166173
})
@@ -184,8 +191,8 @@ mod tests {
184191
use crate::{
185192
array::{Int32Array, StringArray},
186193
builder::PrimitiveRunBuilder,
187-
types::Int32Type,
188-
Int64RunArray,
194+
types::{Int16Type, Int32Type},
195+
Array, Int64RunArray, PrimitiveArray, RunArray,
189196
};
190197

191198
fn build_input_array(size: usize) -> Vec<Option<i32>> {
@@ -345,4 +352,50 @@ mod tests {
345352

346353
assert_eq!(expected_vec, result_asref);
347354
}
355+
356+
#[test]
357+
fn test_sliced_run_array_iterator() {
358+
let total_len = 80;
359+
let input_array = build_input_array(total_len);
360+
361+
// Encode the input_array to run array
362+
let mut builder =
363+
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
364+
builder.extend(input_array.iter().copied());
365+
let run_array = builder.finish();
366+
367+
// test for all slice lengths.
368+
for slice_len in 1..=total_len {
369+
// test for offset = 0, slice length = slice_len
370+
let sliced_run_array: RunArray<Int16Type> =
371+
run_array.slice(0, slice_len).into_data().into();
372+
let sliced_typed_run_array = sliced_run_array
373+
.downcast::<PrimitiveArray<Int32Type>>()
374+
.unwrap();
375+
376+
// Iterate on sliced typed run array
377+
let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
378+
let expected: Vec<Option<i32>> =
379+
input_array.iter().take(slice_len).copied().collect();
380+
assert_eq!(expected, actual);
381+
382+
// test for offset = total_len - slice_len, length = slice_len
383+
let sliced_run_array: RunArray<Int16Type> = run_array
384+
.slice(total_len - slice_len, slice_len)
385+
.into_data()
386+
.into();
387+
let sliced_typed_run_array = sliced_run_array
388+
.downcast::<PrimitiveArray<Int32Type>>()
389+
.unwrap();
390+
391+
// Iterate on sliced typed run array
392+
let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
393+
let expected: Vec<Option<i32>> = input_array
394+
.iter()
395+
.skip(total_len - slice_len)
396+
.copied()
397+
.collect();
398+
assert_eq!(expected, actual);
399+
}
400+
}
348401
}

0 commit comments

Comments
 (0)