Skip to content

Commit b4896a4

Browse files
committed
flesh out columnar batch
1 parent 802fa01 commit b4896a4

File tree

1 file changed

+162
-69
lines changed

1 file changed

+162
-69
lines changed

kernel/src/columnar_batch.rs

Lines changed: 162 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,42 @@
1-
use std::sync::Arc;
2-
31
use crate::{
42
schema::{DataType, Schema, StructField},
53
DeltaResult,
64
};
75

86
pub trait ColumnarBatch {
7+
type Column: ColumnVector;
8+
99
/// Get the schema of the batch.
1010
fn schema(&self) -> DeltaResult<Schema>;
1111

1212
/// Get the column at the specified index.
13-
fn column(&self, index: usize) -> &dyn ColumnVector;
13+
fn column(&self, index: usize) -> Self::Column;
1414

1515
/// Number of rows in the batch.
1616
fn size(&self) -> usize;
1717

18-
fn with_column(
19-
&self,
20-
index: usize,
21-
field: StructField,
22-
column: Arc<dyn ColumnVector>,
23-
) -> DeltaResult<Self>
24-
where
25-
Self: Sized;
18+
// fn with_column(
19+
// &self,
20+
// index: usize,
21+
// field: StructField,
22+
// column: &dyn ColumnVector,
23+
// ) -> DeltaResult<Self>
24+
// where
25+
// Self: Sized;
2626

2727
fn with_deleted_column_at(&self, index: usize) -> DeltaResult<Self>
2828
where
2929
Self: Sized;
3030

31-
fn with_schema(&self, schema: Schema) -> DeltaResult<Self>
32-
where
33-
Self: Sized;
31+
// fn with_schema(&self, schema: Schema) -> DeltaResult<Self>
32+
// where
33+
// Self: Sized;
3434

3535
fn slice(&self, offset: usize, length: usize) -> DeltaResult<Self>
3636
where
3737
Self: Sized;
3838

39-
fn rows(&self) -> Box<dyn Iterator<Item = &dyn Row>>;
39+
fn rows(&self) -> Box<dyn Iterator<Item = Box<dyn Row<Column = Self::Column>>>>;
4040
}
4141

4242
// TODO: do all these methods do bounds checking? Should we offer alternative
@@ -50,43 +50,54 @@ pub trait ColumnVector {
5050
fn is_null(&self, i: usize) -> bool;
5151
fn get_i32(&self, i: usize) -> DeltaResult<Option<i32>>;
5252
fn get_string(&self, i: usize) -> DeltaResult<Option<&str>>;
53-
fn get_struct(&self, i: usize) -> DeltaResult<Option<&dyn Row>>;
54-
fn get_array(&self, i: usize) -> DeltaResult<Option<&dyn ArrayValue>>;
55-
fn get_map(&self, i: usize) -> DeltaResult<Option<&dyn MapValue>>;
53+
fn get_struct(&self, i: usize) -> DeltaResult<Option<Box<dyn Row<Column = Self>>>>;
54+
fn get_array(&self, i: usize) -> DeltaResult<Option<Box<dyn ArrayValue<Column = Self>>>>;
55+
fn get_map(&self, i: usize) -> DeltaResult<Option<Box<dyn MapValue<Column = Self>>>>;
5656
}
5757

5858
pub trait Row {
59+
type Column: ColumnVector;
60+
5961
fn schema(&self) -> DeltaResult<Schema>;
6062
fn is_null(&self, i: usize) -> bool;
6163
fn get_i32(&self, i: usize) -> DeltaResult<Option<i32>>;
6264
fn get_string(&self, i: usize) -> DeltaResult<Option<&str>>;
63-
fn get_struct(&self, i: usize) -> DeltaResult<Option<&dyn Row>>;
64-
fn get_array(&self, i: usize) -> DeltaResult<Option<&dyn ArrayValue>>;
65-
fn get_map(&self, i: usize) -> DeltaResult<Option<&dyn MapValue>>;
65+
fn get_struct(&self, i: usize) -> DeltaResult<Option<Box<dyn Row<Column = Self::Column>>>>;
66+
fn get_array(
67+
&self,
68+
i: usize,
69+
) -> DeltaResult<Option<Box<dyn ArrayValue<Column = Self::Column>>>>;
70+
fn get_map(&self, i: usize) -> DeltaResult<Option<Box<dyn MapValue<Column = Self::Column>>>>;
6671
}
6772

6873
// Based on: https://github.com/delta-io/delta/pull/2087
6974

7075
pub trait ArrayValue {
76+
type Column: ColumnVector;
77+
7178
/// Return the number of elements in the array
7279
fn size(&self) -> usize;
7380

7481
/// Get the elements in the array
75-
fn elements(&self) -> &dyn ColumnVector;
82+
fn elements(&self) -> Self::Column;
7683
}
7784

7885
pub trait MapValue {
86+
type Column: ColumnVector;
87+
7988
/// Return the number of elements in the map
8089
fn size(&self) -> usize;
8190

8291
/// Get the keys in the map
83-
fn keys(&self) -> &dyn ColumnVector;
92+
fn keys(&self) -> Self::Column;
8493

8594
/// Get the values in the map
86-
fn values(&self) -> &dyn ColumnVector;
95+
fn values(&self) -> Self::Column;
8796
}
8897

89-
mod arrow {
98+
pub mod arrow {
99+
use std::sync::Arc;
100+
90101
use arrow_array::cast::AsArray;
91102
use arrow_array::types::Int32Type;
92103
use arrow_array::Array as ArrowArray;
@@ -99,33 +110,67 @@ mod arrow {
99110
use super::*;
100111

101112
impl ColumnarBatch for RecordBatch {
113+
type Column = Arc<dyn ArrowArray>;
114+
102115
fn schema(&self) -> DeltaResult<Schema> {
103116
Ok(self.schema().as_ref().try_into()?)
104117
}
105118

106-
fn column(&self, index: usize) -> &dyn ColumnVector {
107-
&self.column(index).as_ref() as &dyn ColumnVector
119+
fn column(&self, index: usize) -> Self::Column {
120+
self.column(index).clone()
121+
}
122+
123+
fn size(&self) -> usize {
124+
self.num_rows()
125+
}
126+
127+
fn with_deleted_column_at(&self, index: usize) -> DeltaResult<Self>
128+
where
129+
Self: Sized,
130+
{
131+
let indices = (0..self.num_columns())
132+
.filter(|i| *i != index)
133+
.collect::<Vec<_>>();
134+
RecordBatch::project(&self, &indices).map_err(|err| Error::Arrow(err))
135+
}
136+
137+
fn slice(&self, offset: usize, length: usize) -> DeltaResult<Self>
138+
where
139+
Self: Sized,
140+
{
141+
Ok(RecordBatch::slice(self, offset, length))
142+
}
143+
144+
fn rows(&self) -> Box<dyn Iterator<Item = Box<dyn Row<Column = Self::Column>>>> {
145+
let batch = self.clone();
146+
Box::new((0..self.size()).into_iter().map(move |i| {
147+
let row = Box::new(ArrowRow {
148+
batch: batch.clone(),
149+
row_index: i,
150+
});
151+
row as Box<dyn Row<Column = Self::Column>>
152+
}))
108153
}
109154
}
110155

111-
impl ColumnVector for dyn ArrowArray {
156+
impl ColumnVector for Arc<dyn ArrowArray> {
112157
fn data_type(&self) -> DeltaResult<DataType> {
113-
Ok(self.data_type().try_into()?)
158+
Ok(self.as_ref().data_type().try_into()?)
114159
}
115160

116161
fn size(&self) -> usize {
117162
self.len()
118163
}
119164

120165
fn is_null(&self, i: usize) -> bool {
121-
self.is_null(i)
166+
self.as_ref().is_null(i)
122167
}
123168

124169
/// Get the i32 value at the specified index.
125170
///
126171
/// This will panic if the column is not boolean or if the index is out of bounds.
127172
fn get_i32(&self, i: usize) -> DeltaResult<Option<i32>> {
128-
if self.is_null(i) {
173+
if self.as_ref().is_null(i) {
129174
Ok(None)
130175
} else {
131176
Ok(Some(self.as_primitive::<Int32Type>().value(i)))
@@ -136,10 +181,10 @@ mod arrow {
136181
///
137182
/// This will panic if the column is not string or if the index is out of bounds.
138183
fn get_string(&self, i: usize) -> DeltaResult<Option<&str>> {
139-
if self.is_null(i) {
184+
if self.as_ref().is_null(i) {
140185
Ok(None)
141186
} else {
142-
match self.data_type() {
187+
match self.as_ref().data_type() {
143188
ArrowDataType::Utf8 => Ok(Some(self.as_string::<i32>().value(i))),
144189
ArrowDataType::LargeUtf8 => Ok(Some(self.as_string::<i64>().value(i))),
145190
_ => panic!("get_string called on non-string column"),
@@ -150,79 +195,94 @@ mod arrow {
150195
/// Get the struct value at the specified index.
151196
///
152197
/// This will panic if the column is not struct or if the index is out of bounds.
153-
fn get_struct(&self, i: usize) -> DeltaResult<Option<&dyn Row>> {
154-
if self.is_null(i) {
198+
fn get_struct(&self, i: usize) -> DeltaResult<Option<Box<dyn Row<Column = Self>>>> {
199+
if self.as_ref().is_null(i) {
155200
Ok(None)
156201
} else {
157202
let batch = self
158203
.as_struct_opt()
159-
.expect("get_struct called on non-struct column");
204+
.expect("get_struct called on non-struct column")
205+
.clone();
160206
let row = ArrowRow {
161207
batch,
162208
row_index: i,
163209
};
164-
Ok(Some(&row))
210+
Ok(Some(Box::new(row)))
165211
}
166212
}
167213

168214
/// Get the array value at the specified index.
169215
///
170216
/// This will panic if the column is not array or if the index is out of bounds.
171-
fn get_array(&self, i: usize) -> DeltaResult<Option<&dyn ArrayValue>> {
172-
if self.is_null(i) {
217+
fn get_array(&self, i: usize) -> DeltaResult<Option<Box<dyn ArrayValue<Column = Self>>>> {
218+
if self.as_ref().is_null(i) {
173219
Ok(None)
174220
} else {
175-
let batch = self
176-
.as_list_opt()
177-
.expect("get_array called on non-array column");
178-
let array = ArrowArray {
179-
array: batch.value(i),
221+
let sub_array = match self.as_ref().data_type() {
222+
ArrowDataType::List(_) => self.as_list_opt::<i32>().unwrap().value(i),
223+
ArrowDataType::LargeList(_) => self.as_list_opt::<i32>().unwrap().value(i),
224+
_ => panic!("get_array called on non-array column"),
180225
};
181-
Ok(Some(&array))
226+
Ok(Some(Box::new(ArrowArraySlice(sub_array))))
227+
}
228+
}
229+
230+
fn get_map(&self, i: usize) -> DeltaResult<Option<Box<dyn MapValue<Column = Self>>>> {
231+
if self.as_ref().is_null(i) {
232+
Ok(None)
233+
} else {
234+
let arr = self.as_map().value(i);
235+
let map_array = ArrowMapValue {
236+
keys: arr.column(0).clone(),
237+
values: arr.column(1).clone(),
238+
};
239+
Ok(Some(Box::new(map_array)))
182240
}
183241
}
184242
}
185243

186-
trait ArrowTabular {
187-
fn schema(&self) -> &ArrowSchema;
188-
fn column(&self, index: usize) -> &dyn ArrowArray;
244+
pub trait ArrowTabular {
245+
fn schema(&self) -> Arc<ArrowSchema>;
246+
fn column(&self, index: usize) -> &Arc<dyn ArrowArray>;
189247
}
190248

191249
impl ArrowTabular for RecordBatch {
192-
fn schema(&self) -> &ArrowSchema {
193-
self.schema().as_ref()
250+
fn schema(&self) -> Arc<ArrowSchema> {
251+
self.schema()
194252
}
195253

196-
fn column(&self, index: usize) -> &dyn ArrowArray {
197-
self.column(index).as_ref()
254+
fn column(&self, index: usize) -> &Arc<dyn ArrowArray> {
255+
&self.column(index)
198256
}
199257
}
200258

201259
impl ArrowTabular for StructArray {
202-
fn schema(&self) -> &ArrowSchema {
203-
self.schema()
260+
fn schema(&self) -> Arc<ArrowSchema> {
261+
Arc::new(ArrowSchema::new(self.fields().clone()))
204262
}
205263

206-
fn column(&self, index: usize) -> &dyn ArrowArray {
207-
self.column(index).as_ref()
264+
fn column(&self, index: usize) -> &Arc<dyn ArrowArray> {
265+
&self.column(index)
208266
}
209267
}
210268

211269
/// A reference to a row in a RecordBatch or StructArray.
212-
pub struct ArrowRow<'a, T: ArrowTabular> {
213-
batch: &'a T,
270+
pub struct ArrowRow<T: ArrowTabular> {
271+
batch: T,
214272
row_index: usize,
215273
}
216274

217-
impl<'a, T: ArrowTabular> Row for ArrowRow<'a, T> {
275+
impl<T: ArrowTabular> Row for ArrowRow<T> {
276+
type Column = Arc<dyn ArrowArray>;
277+
218278
fn schema(&self) -> DeltaResult<Schema> {
219-
ArrowTabular::schema(self.batch)
279+
ArrowTabular::schema(&self.batch)
220280
.try_into()
221281
.map_err(|err| Error::Arrow(err))
222282
}
223283

224284
fn is_null(&self, i: usize) -> bool {
225-
self.batch.column(i).is_null(self.row_index)
285+
ArrowArray::is_null(self.batch.column(i), self.row_index)
226286
}
227287

228288
fn get_i32(&self, i: usize) -> DeltaResult<Option<i32>> {
@@ -233,24 +293,57 @@ mod arrow {
233293
self.batch.column(i).get_string(self.row_index)
234294
}
235295

236-
fn get_struct(&self, i: usize) -> DeltaResult<Option<&dyn Row>> {
296+
fn get_struct(&self, i: usize) -> DeltaResult<Option<Box<dyn Row<Column = Self::Column>>>> {
237297
self.batch.column(i).get_struct(self.row_index)
238298
}
239299

240-
fn get_array(&self, i: usize) -> DeltaResult<Option<&dyn ArrayValue>> {
300+
fn get_array(
301+
&self,
302+
i: usize,
303+
) -> DeltaResult<Option<Box<dyn ArrayValue<Column = Self::Column>>>> {
241304
self.batch.column(i).get_array(self.row_index)
242305
}
243306

244-
fn get_map(&self, i: usize) -> DeltaResult<Option<&dyn MapValue>> {
307+
fn get_map(
308+
&self,
309+
i: usize,
310+
) -> DeltaResult<Option<Box<dyn MapValue<Column = Self::Column>>>> {
245311
self.batch.column(i).get_map(self.row_index)
246312
}
247313
}
248314

249-
pub struct ArrowArraySlice<'a> {
250-
batch: &'a dyn ArrowArray,
251-
offset: usize,
252-
length: usize,
315+
pub struct ArrowArraySlice(Arc<dyn ArrowArray>);
316+
317+
impl ArrayValue for ArrowArraySlice {
318+
type Column = Arc<dyn ArrowArray>;
319+
320+
fn size(&self) -> usize {
321+
self.0.len()
322+
}
323+
324+
fn elements(&self) -> Self::Column {
325+
self.0.clone()
326+
}
327+
}
328+
329+
pub struct ArrowMapValue {
330+
keys: Arc<dyn ArrowArray>,
331+
values: Arc<dyn ArrowArray>,
253332
}
254333

255-
334+
impl MapValue for ArrowMapValue {
335+
type Column = Arc<dyn ArrowArray>;
336+
337+
fn size(&self) -> usize {
338+
self.keys.len()
339+
}
340+
341+
fn keys(&self) -> Self::Column {
342+
self.keys.clone()
343+
}
344+
345+
fn values(&self) -> Self::Column {
346+
self.values.clone()
347+
}
348+
}
256349
}

0 commit comments

Comments
 (0)