Skip to content

Commit d72be3d

Browse files
committed
change IPC format to store buffer_index directly
1 parent 9e83121 commit d72be3d

File tree

8 files changed

+48
-43
lines changed

8 files changed

+48
-43
lines changed

vortex-array/src/array/chunked/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ impl ChunkedArray {
7272

7373
#[inline]
7474
pub fn chunk(&self, idx: usize) -> VortexResult<Array> {
75-
assert!(idx < self.nchunks());
75+
if idx >= self.nchunks() {
76+
vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
77+
}
7678

7779
let chunk_start = usize::try_from(&scalar_at_unchecked(&self.chunk_offsets(), idx))?;
7880
let chunk_end = usize::try_from(&scalar_at_unchecked(&self.chunk_offsets(), idx + 1))?;

vortex-array/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,15 @@ impl Array {
121121
ArrayChildrenIterator::new(self.clone())
122122
}
123123

124+
/// Count the number of cumulative buffers encoded by self.
125+
pub fn cumulative_nbuffers(&self) -> usize {
126+
self.children()
127+
.iter()
128+
.map(|child| child.cumulative_nbuffers())
129+
.sum::<usize>()
130+
+ if self.buffer().is_some() { 1 } else { 0 }
131+
}
132+
124133
/// Return the buffer offsets and the total length of all buffers, assuming the given alignment.
125134
/// This includes all child buffers.
126135
pub fn all_buffer_offsets(&self, alignment: usize) -> Vec<u64> {

vortex-array/src/view.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -131,28 +131,13 @@ impl ArrayView {
131131
Box::leak(Box::new(OpaqueEncoding(child.encoding())))
132132
});
133133

134-
// Figure out how many buffers to skip...
135-
// We store them depth-first.
136-
let buffer_offset = self
137-
.flatbuffer()
138-
.children()
139-
.ok_or_else(|| vortex_err!("flatbuffer children not found"))?
140-
.iter()
141-
.take(idx)
142-
.map(|child| Self::cumulative_nbuffers(child))
143-
.sum::<usize>()
144-
+ self.has_buffer() as usize;
145-
let buffer_count = Self::cumulative_nbuffers(child);
146-
147134
Ok(Self {
148135
encoding,
149136
dtype: dtype.clone(),
150137
len,
151138
flatbuffer: self.flatbuffer.clone(),
152139
flatbuffer_loc,
153-
buffers: self
154-
.buffers
155-
.slice(buffer_offset, buffer_offset + buffer_count),
140+
buffers: self.buffers.clone(),
156141
ctx: self.ctx.clone(),
157142
})
158143
}
@@ -176,20 +161,22 @@ impl ArrayView {
176161

177162
/// Whether the current Array makes use of a buffer
178163
pub fn has_buffer(&self) -> bool {
179-
self.flatbuffer().has_buffer()
164+
self.flatbuffer().buffer_index().is_some()
180165
}
181166

182167
/// The number of buffers used by the current Array and all its children.
183168
fn cumulative_nbuffers(array: fb::Array) -> usize {
184-
let mut nbuffers = if array.has_buffer() { 1 } else { 0 };
169+
let mut nbuffers = if array.buffer_index().is_some() { 1 } else { 0 };
185170
for child in array.children().unwrap_or_default() {
186171
nbuffers += Self::cumulative_nbuffers(child)
187172
}
188173
nbuffers
189174
}
190175

191176
pub fn buffer(&self) -> Option<&Buffer> {
192-
self.has_buffer().then(|| &self.buffers[0])
177+
self.flatbuffer()
178+
.buffer_index()
179+
.map(|idx| &self.buffers[idx as usize])
193180
}
194181

195182
pub fn statistics(&self) -> &dyn Statistics {

vortex-flatbuffers/flatbuffers/vortex-array/array.fbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ enum Version: uint8 {
66

77
table Array {
88
version: Version = V0;
9-
has_buffer: bool;
9+
buffer_index: uint64 = null;
1010
encoding: uint16;
1111
metadata: [ubyte];
1212
stats: ArrayStats;

vortex-flatbuffers/src/generated/array.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl<'a> flatbuffers::Verifiable for Version {
9393

9494
impl flatbuffers::SimpleToVerifyInSlice for Version {}
9595
pub enum ArrayOffset {}
96-
#[derive(Copy, Clone, PartialEq, Eq)]
96+
#[derive(Copy, Clone, PartialEq)]
9797

9898
pub struct Array<'a> {
9999
pub _tab: flatbuffers::Table<'a>,
@@ -109,7 +109,7 @@ impl<'a> flatbuffers::Follow<'a> for Array<'a> {
109109

110110
impl<'a> Array<'a> {
111111
pub const VT_VERSION: flatbuffers::VOffsetT = 4;
112-
pub const VT_HAS_BUFFER: flatbuffers::VOffsetT = 6;
112+
pub const VT_BUFFER_INDEX: flatbuffers::VOffsetT = 6;
113113
pub const VT_ENCODING: flatbuffers::VOffsetT = 8;
114114
pub const VT_METADATA: flatbuffers::VOffsetT = 10;
115115
pub const VT_STATS: flatbuffers::VOffsetT = 12;
@@ -125,11 +125,11 @@ impl<'a> Array<'a> {
125125
args: &'args ArrayArgs<'args>
126126
) -> flatbuffers::WIPOffset<Array<'bldr>> {
127127
let mut builder = ArrayBuilder::new(_fbb);
128+
if let Some(x) = args.buffer_index { builder.add_buffer_index(x); }
128129
if let Some(x) = args.children { builder.add_children(x); }
129130
if let Some(x) = args.stats { builder.add_stats(x); }
130131
if let Some(x) = args.metadata { builder.add_metadata(x); }
131132
builder.add_encoding(args.encoding);
132-
builder.add_has_buffer(args.has_buffer);
133133
builder.add_version(args.version);
134134
builder.finish()
135135
}
@@ -143,11 +143,11 @@ impl<'a> Array<'a> {
143143
unsafe { self._tab.get::<Version>(Array::VT_VERSION, Some(Version::V0)).unwrap()}
144144
}
145145
#[inline]
146-
pub fn has_buffer(&self) -> bool {
146+
pub fn buffer_index(&self) -> Option<u64> {
147147
// Safety:
148148
// Created from valid Table for this object
149149
// which contains a valid value in this slot
150-
unsafe { self._tab.get::<bool>(Array::VT_HAS_BUFFER, Some(false)).unwrap()}
150+
unsafe { self._tab.get::<u64>(Array::VT_BUFFER_INDEX, None)}
151151
}
152152
#[inline]
153153
pub fn encoding(&self) -> u16 {
@@ -187,7 +187,7 @@ impl flatbuffers::Verifiable for Array<'_> {
187187
use self::flatbuffers::Verifiable;
188188
v.visit_table(pos)?
189189
.visit_field::<Version>("version", Self::VT_VERSION, false)?
190-
.visit_field::<bool>("has_buffer", Self::VT_HAS_BUFFER, false)?
190+
.visit_field::<u64>("buffer_index", Self::VT_BUFFER_INDEX, false)?
191191
.visit_field::<u16>("encoding", Self::VT_ENCODING, false)?
192192
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("metadata", Self::VT_METADATA, false)?
193193
.visit_field::<flatbuffers::ForwardsUOffset<ArrayStats>>("stats", Self::VT_STATS, false)?
@@ -198,7 +198,7 @@ impl flatbuffers::Verifiable for Array<'_> {
198198
}
199199
pub struct ArrayArgs<'a> {
200200
pub version: Version,
201-
pub has_buffer: bool,
201+
pub buffer_index: Option<u64>,
202202
pub encoding: u16,
203203
pub metadata: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
204204
pub stats: Option<flatbuffers::WIPOffset<ArrayStats<'a>>>,
@@ -209,7 +209,7 @@ impl<'a> Default for ArrayArgs<'a> {
209209
fn default() -> Self {
210210
ArrayArgs {
211211
version: Version::V0,
212-
has_buffer: false,
212+
buffer_index: None,
213213
encoding: 0,
214214
metadata: None,
215215
stats: None,
@@ -228,8 +228,8 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> ArrayBuilder<'a, 'b, A> {
228228
self.fbb_.push_slot::<Version>(Array::VT_VERSION, version, Version::V0);
229229
}
230230
#[inline]
231-
pub fn add_has_buffer(&mut self, has_buffer: bool) {
232-
self.fbb_.push_slot::<bool>(Array::VT_HAS_BUFFER, has_buffer, false);
231+
pub fn add_buffer_index(&mut self, buffer_index: u64) {
232+
self.fbb_.push_slot_always::<u64>(Array::VT_BUFFER_INDEX, buffer_index);
233233
}
234234
#[inline]
235235
pub fn add_encoding(&mut self, encoding: u16) {
@@ -266,7 +266,7 @@ impl core::fmt::Debug for Array<'_> {
266266
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
267267
let mut ds = f.debug_struct("Array");
268268
ds.field("version", &self.version());
269-
ds.field("has_buffer", &self.has_buffer());
269+
ds.field("buffer_index", &self.buffer_index());
270270
ds.field("encoding", &self.encoding());
271271
ds.field("metadata", &self.metadata());
272272
ds.field("stats", &self.stats());
@@ -275,7 +275,7 @@ impl core::fmt::Debug for Array<'_> {
275275
}
276276
}
277277
pub enum ArrayStatsOffset {}
278-
#[derive(Copy, Clone, PartialEq, Eq)]
278+
#[derive(Copy, Clone, PartialEq)]
279279

280280
pub struct ArrayStats<'a> {
281281
pub _tab: flatbuffers::Table<'a>,

vortex-flatbuffers/src/generated/dtype.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ impl flatbuffers::SimpleToVerifyInSlice for Type {}
250250
pub struct TypeUnionTableOffset {}
251251

252252
pub enum NullOffset {}
253-
#[derive(Copy, Clone, PartialEq, Eq)]
253+
#[derive(Copy, Clone, PartialEq)]
254254

255255
pub struct Null<'a> {
256256
pub _tab: flatbuffers::Table<'a>,
@@ -329,7 +329,7 @@ impl core::fmt::Debug for Null<'_> {
329329
}
330330
}
331331
pub enum BoolOffset {}
332-
#[derive(Copy, Clone, PartialEq, Eq)]
332+
#[derive(Copy, Clone, PartialEq)]
333333

334334
pub struct Bool<'a> {
335335
pub _tab: flatbuffers::Table<'a>,
@@ -540,7 +540,7 @@ impl core::fmt::Debug for Primitive<'_> {
540540
}
541541
}
542542
pub enum DecimalOffset {}
543-
#[derive(Copy, Clone, PartialEq, Eq)]
543+
#[derive(Copy, Clone, PartialEq)]
544544

545545
pub struct Decimal<'a> {
546546
pub _tab: flatbuffers::Table<'a>,
@@ -673,7 +673,7 @@ impl core::fmt::Debug for Decimal<'_> {
673673
}
674674
}
675675
pub enum Utf8Offset {}
676-
#[derive(Copy, Clone, PartialEq, Eq)]
676+
#[derive(Copy, Clone, PartialEq)]
677677

678678
pub struct Utf8<'a> {
679679
pub _tab: flatbuffers::Table<'a>,
@@ -770,7 +770,7 @@ impl core::fmt::Debug for Utf8<'_> {
770770
}
771771
}
772772
pub enum BinaryOffset {}
773-
#[derive(Copy, Clone, PartialEq, Eq)]
773+
#[derive(Copy, Clone, PartialEq)]
774774

775775
pub struct Binary<'a> {
776776
pub _tab: flatbuffers::Table<'a>,

vortex-flatbuffers/src/generated/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
// @generated
55

66
use crate::scalar::*;
7-
use crate::array::*;
87
use crate::dtype::*;
8+
use crate::array::*;
99
use core::mem;
1010
use core::cmp::Ordering;
1111

vortex-serde/src/messages.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub enum IPCMessage<'a> {
1818

1919
pub struct IPCSchema<'a>(pub &'a DType);
2020
pub struct IPCBatch<'a>(pub &'a Array);
21-
pub struct IPCArray<'a>(pub &'a Array);
21+
pub struct IPCArray<'a>(pub &'a Array, usize);
2222
pub struct IPCPage<'a>(pub &'a Buffer);
2323

2424
pub struct IPCDType(pub DType);
@@ -87,13 +87,15 @@ impl<'a> WriteFlatBuffer for IPCBatch<'a> {
8787
fbb: &mut FlatBufferBuilder<'fb>,
8888
) -> WIPOffset<Self::Target<'fb>> {
8989
let array_data = self.0;
90-
let array = Some(IPCArray(array_data).write_flatbuffer(fbb));
90+
let array = Some(IPCArray(array_data, 0).write_flatbuffer(fbb));
9191

9292
let length = array_data.len() as u64;
9393

9494
// Walk the ColumnData depth-first to compute the buffer offsets.
9595
let mut buffers = vec![];
9696
let mut offset = 0;
97+
98+
// buffer offset should be saved instead of the has_buffers stuff
9799
for array_data in array_data.depth_first_traversal() {
98100
if let Some(buffer) = array_data.buffer() {
99101
let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1);
@@ -149,7 +151,12 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
149151
let children = column_data
150152
.children()
151153
.iter()
152-
.map(|child| IPCArray(child).write_flatbuffer(fbb))
154+
.scan(self.1, |buffer_offset, child| {
155+
// Update the number of buffers required.
156+
let msg = IPCArray(child, *buffer_offset).write_flatbuffer(fbb);
157+
*buffer_offset += child.cumulative_nbuffers();
158+
Some(msg)
159+
})
153160
.collect_vec();
154161
let children = Some(fbb.create_vector(&children));
155162

@@ -159,7 +166,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
159166
fbb,
160167
&fba::ArrayArgs {
161168
version: Default::default(),
162-
has_buffer: column_data.buffer().is_some(),
169+
buffer_index: self.0.buffer().is_some().then_some(self.1 as u64),
163170
encoding,
164171
metadata,
165172
stats,

0 commit comments

Comments
 (0)