Skip to content

Commit

Permalink
Support for writing BooleanArrays
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Aug 22, 2024
1 parent df77bd5 commit 95df0e5
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 6 deletions.
14 changes: 12 additions & 2 deletions src/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ fn serialize_schema(schema: &SchemaRef) -> Vec<proto::Type> {
kind: Some(proto::r#type::Kind::Binary.into()),
..Default::default()
},
ArrowDataType::Boolean => proto::Type {
kind: Some(proto::r#type::Kind::Boolean.into()),
..Default::default()
},
// TODO: support more types
_ => unimplemented!("unsupported datatype"),
};
Expand Down Expand Up @@ -259,8 +263,9 @@ mod tests {

use arrow::{
array::{
Array, BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, LargeBinaryArray, LargeStringArray, RecordBatchReader, StringArray,
Array, BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, LargeBinaryArray, LargeStringArray, RecordBatchReader,
StringArray,
},
compute::concat_batches,
datatypes::{DataType as ArrowDataType, Field, Schema},
Expand Down Expand Up @@ -312,6 +317,9 @@ mod tests {
"".as_bytes(),
"123".as_bytes(),
]));
let boolean_array = Arc::new(BooleanArray::from(vec![
true, false, true, false, true, true, false,
]));
let schema = Schema::new(vec![
Field::new("f32", ArrowDataType::Float32, false),
Field::new("f64", ArrowDataType::Float64, false),
Expand All @@ -321,6 +329,7 @@ mod tests {
Field::new("int64", ArrowDataType::Int64, false),
Field::new("utf8", ArrowDataType::Utf8, false),
Field::new("binary", ArrowDataType::Binary, false),
Field::new("boolean", ArrowDataType::Boolean, false),
]);

let batch = RecordBatch::try_new(
Expand All @@ -334,6 +343,7 @@ mod tests {
int64_array,
utf8_array,
binary_array,
boolean_array,
],
)
.unwrap();
Expand Down
13 changes: 12 additions & 1 deletion src/encoding/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

use std::io::Read;

use arrow::{array::BooleanBufferBuilder, buffer::NullBuffer};
use arrow::{
array::BooleanBufferBuilder,
buffer::{BooleanBuffer, NullBuffer},
};
use bytes::Bytes;

use crate::{error::Result, memory::EstimateMemory};
Expand Down Expand Up @@ -98,6 +101,10 @@ impl BooleanEncoder {

pub fn extend(&mut self, null_buffer: &NullBuffer) {
let bb = null_buffer.inner();
self.extend_bb(bb);
}

pub fn extend_bb(&mut self, bb: &BooleanBuffer) {
self.builder.append_buffer(bb);
}

Expand All @@ -106,6 +113,10 @@ impl BooleanEncoder {
self.builder.append_n(n, true);
}

pub fn extend_boolean(&mut self, b: bool) {
self.builder.append(b);
}

/// Produce ORC present stream bytes and reset internal builder.
pub fn finish(&mut self) -> Bytes {
// TODO: don't throw away allocation?
Expand Down
93 changes: 93 additions & 0 deletions src/writer/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,99 @@ impl<T: ArrowPrimitiveType, E: PrimitiveValueEncoder<T::Native>> ColumnStripeEnc
}
}

pub struct BooleanColumnEncoder {
encoder: BooleanEncoder,
/// Lazily initialized once we encounter an [`Array`] with a [`NullBuffer`].
present: Option<BooleanEncoder>,
encoded_count: usize,
}

impl BooleanColumnEncoder {
pub fn new() -> Self {
Self {
encoder: BooleanEncoder::new(),
present: None,
encoded_count: 0,
}
}
}

impl EstimateMemory for BooleanColumnEncoder {
fn estimate_memory_size(&self) -> usize {
self.encoder.estimate_memory_size()
+ self
.present
.as_ref()
.map(|p| p.estimate_memory_size())
.unwrap_or(0)
}
}

impl ColumnStripeEncoder for BooleanColumnEncoder {
fn encode_array(&mut self, array: &ArrayRef) -> Result<()> {
// TODO: return as result instead of panicking here?
let array = array.as_boolean();
// Handling case where if encoding across RecordBatch boundaries, arrays
// might introduce a NullBuffer
match (array.nulls(), &mut self.present) {
// Need to copy only the valid values as indicated by null_buffer
(Some(null_buffer), Some(present)) => {
present.extend(null_buffer);
for index in null_buffer.valid_indices() {
let v = array.value(index);
self.encoder.extend_boolean(v);
}
}
(Some(null_buffer), None) => {
// Lazily initiate present buffer and ensure backfill the already encoded values
let mut present = BooleanEncoder::new();
present.extend_present(self.encoded_count);
present.extend(null_buffer);
self.present = Some(present);
for index in null_buffer.valid_indices() {
let v = array.value(index);
self.encoder.extend_boolean(v);
}
}
// Simple direct copy from values buffer, extending present if needed
(None, _) => {
let values = array.values();
self.encoder.extend_bb(values);
if let Some(present) = self.present.as_mut() {
present.extend_present(array.len())
}
}
}
self.encoded_count += array.len() - array.null_count();
Ok(())
}

fn column_encoding(&self) -> ColumnEncoding {
ColumnEncoding::Direct
}

fn finish(&mut self) -> Vec<Stream> {
let bytes = self.encoder.finish();
// Return mandatory Data stream and optional Present stream
let data = Stream {
kind: StreamType::Data,
bytes,
};
self.encoded_count = 0;
match &mut self.present {
Some(present) => {
let bytes = present.finish();
let present = Stream {
kind: StreamType::Present,
bytes,
};
vec![data, present]
}
None => vec![data],
}
}
}

/// Direct encodes binary/strings.
pub struct GenericBinaryColumnEncoder<T: ByteArrayType>
where
Expand Down
7 changes: 4 additions & 3 deletions src/writer/stripe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::memory::EstimateMemory;
use crate::proto;

use super::column::{
BinaryColumnEncoder, ByteColumnEncoder, ColumnStripeEncoder, DoubleColumnEncoder,
FloatColumnEncoder, Int16ColumnEncoder, Int32ColumnEncoder, Int64ColumnEncoder,
LargeBinaryColumnEncoder, LargeStringColumnEncoder, StringColumnEncoder,
BinaryColumnEncoder, BooleanColumnEncoder, ByteColumnEncoder, ColumnStripeEncoder,
DoubleColumnEncoder, FloatColumnEncoder, Int16ColumnEncoder, Int32ColumnEncoder,
Int64ColumnEncoder, LargeBinaryColumnEncoder, LargeStringColumnEncoder, StringColumnEncoder,
};
use super::{ColumnEncoding, StreamType};

Expand Down Expand Up @@ -182,6 +182,7 @@ fn create_encoder(field: &FieldRef) -> Box<dyn ColumnStripeEncoder> {
ArrowDataType::LargeUtf8 => Box::new(LargeStringColumnEncoder::new()),
ArrowDataType::Binary => Box::new(BinaryColumnEncoder::new()),
ArrowDataType::LargeBinary => Box::new(LargeBinaryColumnEncoder::new()),
ArrowDataType::Boolean => Box::new(BooleanColumnEncoder::new()),
// TODO: support more datatypes
_ => unimplemented!("unsupported datatype"),
}
Expand Down

0 comments on commit 95df0e5

Please sign in to comment.