From 95df0e525cb00dc3e0fbb2e92b0e6a4d014931a3 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Thu, 22 Aug 2024 21:57:42 +1000 Subject: [PATCH] Support for writing BooleanArrays --- src/arrow_writer.rs | 14 ++++++- src/encoding/boolean.rs | 13 +++++- src/writer/column.rs | 93 +++++++++++++++++++++++++++++++++++++++++ src/writer/stripe.rs | 7 ++-- 4 files changed, 121 insertions(+), 6 deletions(-) diff --git a/src/arrow_writer.rs b/src/arrow_writer.rs index 3ca12d1c..77df540a 100644 --- a/src/arrow_writer.rs +++ b/src/arrow_writer.rs @@ -208,6 +208,10 @@ fn serialize_schema(schema: &SchemaRef) -> Vec { kind: Some(proto::r#type::Kind::Binary.into()), ..Default::default() }, + ArrowDataType::Boolean => proto::Type { + kind: Some(proto::r#type::Kind::Boolean.into()), + ..Default::default() + }, // TODO: support more types _ => unimplemented!("unsupported datatype"), }; @@ -259,8 +263,9 @@ mod tests { use arrow::{ array::{ - Array, BinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, - Int8Array, LargeBinaryArray, LargeStringArray, RecordBatchReader, StringArray, + Array, BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, + Int64Array, Int8Array, LargeBinaryArray, LargeStringArray, RecordBatchReader, + StringArray, }, compute::concat_batches, datatypes::{DataType as ArrowDataType, Field, Schema}, @@ -312,6 +317,9 @@ mod tests { "".as_bytes(), "123".as_bytes(), ])); + let boolean_array = Arc::new(BooleanArray::from(vec![ + true, false, true, false, true, true, false, + ])); let schema = Schema::new(vec![ Field::new("f32", ArrowDataType::Float32, false), Field::new("f64", ArrowDataType::Float64, false), @@ -321,6 +329,7 @@ mod tests { Field::new("int64", ArrowDataType::Int64, false), Field::new("utf8", ArrowDataType::Utf8, false), Field::new("binary", ArrowDataType::Binary, false), + Field::new("boolean", ArrowDataType::Boolean, false), ]); let batch = RecordBatch::try_new( @@ -334,6 +343,7 @@ mod tests { int64_array, utf8_array, binary_array, + boolean_array, ], ) .unwrap(); diff --git a/src/encoding/boolean.rs b/src/encoding/boolean.rs index d9e53a5b..b409ad3b 100644 --- a/src/encoding/boolean.rs +++ b/src/encoding/boolean.rs @@ -17,7 +17,10 @@ use std::io::Read; -use arrow::{array::BooleanBufferBuilder, buffer::NullBuffer}; +use arrow::{ + array::BooleanBufferBuilder, + buffer::{BooleanBuffer, NullBuffer}, +}; use bytes::Bytes; use crate::{error::Result, memory::EstimateMemory}; @@ -98,6 +101,10 @@ impl BooleanEncoder { pub fn extend(&mut self, null_buffer: &NullBuffer) { let bb = null_buffer.inner(); + self.extend_bb(bb); + } + + pub fn extend_bb(&mut self, bb: &BooleanBuffer) { self.builder.append_buffer(bb); } @@ -106,6 +113,10 @@ impl BooleanEncoder { self.builder.append_n(n, true); } + pub fn extend_boolean(&mut self, b: bool) { + self.builder.append(b); + } + /// Produce ORC present stream bytes and reset internal builder. pub fn finish(&mut self) -> Bytes { // TODO: don't throw away allocation? diff --git a/src/writer/column.rs b/src/writer/column.rs index 5737880d..c79c35b0 100644 --- a/src/writer/column.rs +++ b/src/writer/column.rs @@ -161,6 +161,99 @@ impl> ColumnStripeEnc } } +pub struct BooleanColumnEncoder { + encoder: BooleanEncoder, + /// Lazily initialized once we encounter an [`Array`] with a [`NullBuffer`]. + present: Option, + encoded_count: usize, +} + +impl BooleanColumnEncoder { + pub fn new() -> Self { + Self { + encoder: BooleanEncoder::new(), + present: None, + encoded_count: 0, + } + } +} + +impl EstimateMemory for BooleanColumnEncoder { + fn estimate_memory_size(&self) -> usize { + self.encoder.estimate_memory_size() + + self + .present + .as_ref() + .map(|p| p.estimate_memory_size()) + .unwrap_or(0) + } +} + +impl ColumnStripeEncoder for BooleanColumnEncoder { + fn encode_array(&mut self, array: &ArrayRef) -> Result<()> { + // TODO: return as result instead of panicking here? + let array = array.as_boolean(); + // Handling case where if encoding across RecordBatch boundaries, arrays + // might introduce a NullBuffer + match (array.nulls(), &mut self.present) { + // Need to copy only the valid values as indicated by null_buffer + (Some(null_buffer), Some(present)) => { + present.extend(null_buffer); + for index in null_buffer.valid_indices() { + let v = array.value(index); + self.encoder.extend_boolean(v); + } + } + (Some(null_buffer), None) => { + // Lazily initiate present buffer and ensure backfill the already encoded values + let mut present = BooleanEncoder::new(); + present.extend_present(self.encoded_count); + present.extend(null_buffer); + self.present = Some(present); + for index in null_buffer.valid_indices() { + let v = array.value(index); + self.encoder.extend_boolean(v); + } + } + // Simple direct copy from values buffer, extending present if needed + (None, _) => { + let values = array.values(); + self.encoder.extend_bb(values); + if let Some(present) = self.present.as_mut() { + present.extend_present(array.len()) + } + } + } + self.encoded_count += array.len() - array.null_count(); + Ok(()) + } + + fn column_encoding(&self) -> ColumnEncoding { + ColumnEncoding::Direct + } + + fn finish(&mut self) -> Vec { + let bytes = self.encoder.finish(); + // Return mandatory Data stream and optional Present stream + let data = Stream { + kind: StreamType::Data, + bytes, + }; + self.encoded_count = 0; + match &mut self.present { + Some(present) => { + let bytes = present.finish(); + let present = Stream { + kind: StreamType::Present, + bytes, + }; + vec![data, present] + } + None => vec![data], + } + } +} + /// Direct encodes binary/strings. pub struct GenericBinaryColumnEncoder where diff --git a/src/writer/stripe.rs b/src/writer/stripe.rs index 9321ea59..e16ee8e8 100644 --- a/src/writer/stripe.rs +++ b/src/writer/stripe.rs @@ -27,9 +27,9 @@ use crate::memory::EstimateMemory; use crate::proto; use super::column::{ - BinaryColumnEncoder, ByteColumnEncoder, ColumnStripeEncoder, DoubleColumnEncoder, - FloatColumnEncoder, Int16ColumnEncoder, Int32ColumnEncoder, Int64ColumnEncoder, - LargeBinaryColumnEncoder, LargeStringColumnEncoder, StringColumnEncoder, + BinaryColumnEncoder, BooleanColumnEncoder, ByteColumnEncoder, ColumnStripeEncoder, + DoubleColumnEncoder, FloatColumnEncoder, Int16ColumnEncoder, Int32ColumnEncoder, + Int64ColumnEncoder, LargeBinaryColumnEncoder, LargeStringColumnEncoder, StringColumnEncoder, }; use super::{ColumnEncoding, StreamType}; @@ -182,6 +182,7 @@ fn create_encoder(field: &FieldRef) -> Box { ArrowDataType::LargeUtf8 => Box::new(LargeStringColumnEncoder::new()), ArrowDataType::Binary => Box::new(BinaryColumnEncoder::new()), ArrowDataType::LargeBinary => Box::new(LargeBinaryColumnEncoder::new()), + ArrowDataType::Boolean => Box::new(BooleanColumnEncoder::new()), // TODO: support more datatypes _ => unimplemented!("unsupported datatype"), }