Skip to content

Commit

Permalink
nano-arrow just warnings left
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 14, 2023
1 parent beeb6e6 commit 6d95b29
Show file tree
Hide file tree
Showing 30 changed files with 289 additions and 275 deletions.
4 changes: 2 additions & 2 deletions crates/nano-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ futures = { workspace = true, optional = true }
async-stream = { version = "0.3.2", optional = true }

# avro support
avro-schema = { version = "0.3", optional = true }
avro-schema = { workspace = true, optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
Expand Down Expand Up @@ -150,7 +150,7 @@ io_parquet_brotli = ["parquet2/brotli"]
# parquet bloom filter functions
io_parquet_bloom_filter = ["parquet2/bloom_filter"]

io_avro = ["avro-schema"]
io_avro = ["avro-schema", "polars-error/avro-schema"]
io_avro_compression = [
"avro-schema/compression",
]
Expand Down
10 changes: 1 addition & 9 deletions crates/nano-arrow/src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@
pub use avro_schema;

impl From<avro_schema::error::Error> for crate::error::Error {
fn from(error: avro_schema::error::Error) -> Self {
Self::ExternalFormat(error.to_string())
}
}

pub mod read;
pub mod write;

Expand All @@ -21,9 +15,7 @@ macro_rules! avro_decode {
loop {
if j > 9 {
// if j * 7 > 64
return Err(Error::ExternalFormat(
"zigzag decoding failed - corrupt avro file".to_string(),
));
polars_error::polars_bail!(oos = "zigzag decoding failed - corrupt avro file")
}
$reader.read_exact(&mut buf[..])$($_await)*?;
i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
Expand Down
48 changes: 24 additions & 24 deletions crates/nano-arrow/src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ use std::convert::TryInto;

use avro_schema::file::Block;
use avro_schema::schema::{Enum, Field as AvroField, Record, Schema as AvroSchema};
use polars_error::{polars_bail, polars_err, PolarsResult};

use super::nested::*;
use super::util;
use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::*;
use crate::error::{Error, Result};
use crate::types::months_days_ns;
use crate::with_match_primitive_type;

fn make_mutable(
data_type: &DataType,
avro_field: Option<&AvroSchema>,
capacity: usize,
) -> Result<Box<dyn MutableArray>> {
) -> PolarsResult<Box<dyn MutableArray>> {
Ok(match data_type.to_physical_type() {
PhysicalType::Boolean => {
Box::new(MutableBooleanArray::with_capacity(capacity)) as Box<dyn MutableArray>
Expand Down Expand Up @@ -57,14 +57,14 @@ fn make_mutable(
let values = fields
.iter()
.map(|field| make_mutable(field.data_type(), None, capacity))
.collect::<Result<Vec<_>>>()?;
.collect::<PolarsResult<Vec<_>>>()?;
Box::new(DynMutableStructArray::new(values, data_type.clone()))
as Box<dyn MutableArray>
},
other => {
return Err(Error::NotYetImplemented(format!(
polars_bail!(nyi =
"Deserializing type {other:#?} is still not implemented"
)))
)
},
},
})
Expand All @@ -83,7 +83,7 @@ fn deserialize_item<'a>(
is_nullable: bool,
avro_field: &AvroSchema,
mut block: &'a [u8],
) -> Result<&'a [u8]> {
) -> PolarsResult<&'a [u8]> {
if is_nullable {
let variant = util::zigzag_i64(&mut block)?;
let is_null_first = is_union_null_first(avro_field);
Expand All @@ -99,7 +99,7 @@ fn deserialize_value<'a>(
array: &mut dyn MutableArray,
avro_field: &AvroSchema,
mut block: &'a [u8],
) -> Result<&'a [u8]> {
) -> PolarsResult<&'a [u8]> {
let data_type = array.data_type();
match data_type {
DataType::List(inner) => {
Expand Down Expand Up @@ -250,18 +250,18 @@ fn deserialize_value<'a>(
let len = match avro_inner {
AvroSchema::Bytes(_) => {
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
Error::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
polars_err!(oos =
"Avro format contains a non-usize number of bytes"
)
})?
},
AvroSchema::Fixed(b) => b.size,
_ => unreachable!(),
};
if len > 16 {
return Err(Error::ExternalFormat(
"Avro decimal bytes return more than 16 bytes".to_string(),
));
polars_bail!(oos =
"Avro decimal bytes return more than 16 bytes"
)
}
let mut bytes = [0u8; 16];
bytes[..len].copy_from_slice(&block[..len]);
Expand All @@ -277,8 +277,8 @@ fn deserialize_value<'a>(
},
PhysicalType::Utf8 => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
Error::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
polars_err!(oos =
"Avro format contains a non-usize number of bytes"
)
})?;
let data = simdutf8::basic::from_utf8(&block[..len])?;
Expand All @@ -292,8 +292,8 @@ fn deserialize_value<'a>(
},
PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
Error::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
polars_err!(oos =
"Avro format contains a non-usize number of bytes"
)
})?;
let data = &block[..len];
Expand Down Expand Up @@ -329,7 +329,7 @@ fn deserialize_value<'a>(
Ok(block)
}

fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) -> Result<&'a [u8]> {
fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) -> PolarsResult<&'a [u8]> {
if field.is_nullable {
let variant = util::zigzag_i64(&mut block)?;
let is_null_first = is_union_null_first(avro_field);
Expand Down Expand Up @@ -366,7 +366,7 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) ->
.map(|bytes| {
bytes
.try_into()
.map_err(|_| Error::oos("Avro block size negative or too large"))
.map_err(|_| polars_err!(oos = "Avro block size negative or too large"))
})
.transpose()?;

Expand Down Expand Up @@ -431,8 +431,8 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) ->
let len = match avro_inner {
AvroSchema::Bytes(_) => {
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
Error::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
polars_err!(oos =
"Avro format contains a non-usize number of bytes"
)
})?
},
Expand All @@ -445,8 +445,8 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) ->
},
PhysicalType::Utf8 | PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
Error::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
polars_err!(oos =
"Avro format contains a non-usize number of bytes"
)
})?;
block = &block[len..];
Expand Down Expand Up @@ -478,7 +478,7 @@ pub fn deserialize(
fields: &[Field],
avro_fields: &[AvroField],
projection: &[bool],
) -> Result<Chunk<Box<dyn Array>>> {
) -> PolarsResult<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), avro_fields.len());
assert_eq!(fields.len(), projection.len());

Expand All @@ -498,7 +498,7 @@ pub fn deserialize(
make_mutable(&DataType::Int32, None, 0)
}
})
.collect::<Result<_>>()?;
.collect::<PolarsResult<_>>()?;

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
Expand Down
5 changes: 3 additions & 2 deletions crates/nano-arrow/src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use avro_schema::schema::Field as AvroField;

mod deserialize;
pub use deserialize::deserialize;
use polars_error::PolarsResult;

mod nested;
mod schema;
mod util;
Expand All @@ -17,7 +19,6 @@ pub use schema::infer_schema;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Field;
use crate::error::Result;

/// Single threaded, blocking reader of Avro; [`Iterator`] of [`Chunk`].
pub struct Reader<R: Read> {
Expand Down Expand Up @@ -52,7 +53,7 @@ impl<R: Read> Reader<R> {
}

impl<R: Read> Iterator for Reader<R> {
type Item = Result<Chunk<Box<dyn Array>>>;
type Item = PolarsResult<Chunk<Box<dyn Array>>>;

fn next(&mut self) -> Option<Self::Item> {
let fields = &self.fields[..];
Expand Down
8 changes: 4 additions & 4 deletions crates/nano-arrow/src/io/avro/read/nested.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use polars_error::{polars_err, PolarsResult};
use crate::array::*;
use crate::bitmap::*;
use crate::datatypes::*;
use crate::error::*;
use crate::offset::{Offset, Offsets};

/// Auxiliary struct
Expand Down Expand Up @@ -31,10 +31,10 @@ impl<O: Offset> DynMutableListArray<O> {
}

#[inline]
pub fn try_push_valid(&mut self) -> Result<()> {
pub fn try_push_valid(&mut self) -> PolarsResult<()> {
let total_length = self.values.len();
let offset = self.offsets.last().to_usize();
let length = total_length.checked_sub(offset).ok_or(Error::Overflow)?;
let length = total_length.checked_sub(offset).ok_or(polars_err!(ComputeError: "overflow"))?;

self.offsets.try_push(length)?;
if let Some(validity) = &mut self.validity {
Expand Down Expand Up @@ -227,7 +227,7 @@ impl DynMutableStructArray {
}

#[inline]
pub fn try_push_valid(&mut self) -> Result<()> {
pub fn try_push_valid(&mut self) -> PolarsResult<()> {
if let Some(validity) = &mut self.validity {
validity.push(true)
}
Expand Down
16 changes: 8 additions & 8 deletions crates/nano-arrow/src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use avro_schema::schema::{Enum, Fixed, Record, Schema as AvroSchema};
use polars_error::{polars_bail, PolarsResult};

use crate::datatypes::*;
use crate::error::{Error, Result};

fn external_props(schema: &AvroSchema) -> Metadata {
let mut props = Metadata::new();
Expand All @@ -21,7 +21,7 @@ fn external_props(schema: &AvroSchema) -> Metadata {

/// Infers an [`Schema`] from the root [`Record`].
/// This
pub fn infer_schema(record: &Record) -> Result<Schema> {
pub fn infer_schema(record: &Record) -> PolarsResult<Schema> {
Ok(record
.fields
.iter()
Expand All @@ -32,11 +32,11 @@ pub fn infer_schema(record: &Record) -> Result<Schema> {
external_props(&field.schema),
)
})
.collect::<Result<Vec<_>>>()?
.collect::<PolarsResult<Vec<_>>>()?
.into())
}

fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) -> Result<Field> {
fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) -> PolarsResult<Field> {
let mut nullable = false;
let data_type = match schema {
AvroSchema::Null => DataType::Null,
Expand Down Expand Up @@ -94,15 +94,15 @@ fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) ->
{
schema_to_field(schema, None, Metadata::default())?.data_type
} else {
return Err(Error::NotYetImplemented(format!(
polars_bail!(nyi =
"Can't read avro union {schema:?}"
)));
);
}
} else {
let fields = schemas
.iter()
.map(|s| schema_to_field(s, None, Metadata::default()))
.collect::<Result<Vec<Field>>>()?;
.collect::<PolarsResult<Vec<Field>>>()?;
DataType::Union(fields, None, UnionMode::Dense)
}
},
Expand All @@ -116,7 +116,7 @@ fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) ->
}
schema_to_field(&field.schema, Some(&field.name), props)
})
.collect::<Result<_>>()?;
.collect::<PolarsResult<_>>()?;
DataType::Struct(fields)
},
AvroSchema::Enum { .. } => {
Expand Down
16 changes: 8 additions & 8 deletions crates/nano-arrow/src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ use avro_schema::schema::{
BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record,
Schema as AvroSchema,
};
use polars_error::{polars_bail, PolarsResult};

use crate::datatypes::*;
use crate::error::{Error, Result};

/// Converts a [`Schema`] to an Avro [`Record`].
pub fn to_record(schema: &Schema) -> Result<Record> {
pub fn to_record(schema: &Schema) -> PolarsResult<Record> {
let mut name_counter: i32 = 0;
let fields = schema
.fields
.iter()
.map(|f| field_to_field(f, &mut name_counter))
.collect::<Result<_>>()?;
.collect::<PolarsResult<_>>()?;
Ok(Record {
name: "".to_string(),
namespace: None,
Expand All @@ -23,7 +23,7 @@ pub fn to_record(schema: &Schema) -> Result<Record> {
})
}

fn field_to_field(field: &Field, name_counter: &mut i32) -> Result<AvroField> {
fn field_to_field(field: &Field, name_counter: &mut i32) -> PolarsResult<AvroField> {
let schema = type_to_schema(field.data_type(), field.is_nullable, name_counter)?;
Ok(AvroField::new(&field.name, schema))
}
Expand All @@ -32,7 +32,7 @@ fn type_to_schema(
data_type: &DataType,
is_nullable: bool,
name_counter: &mut i32,
) -> Result<AvroSchema> {
) -> PolarsResult<AvroSchema> {
Ok(if is_nullable {
AvroSchema::Union(vec![
AvroSchema::Null,
Expand All @@ -48,7 +48,7 @@ fn _get_field_name(name_counter: &mut i32) -> String {
format!("r{name_counter}")
}

fn _type_to_schema(data_type: &DataType, name_counter: &mut i32) -> Result<AvroSchema> {
fn _type_to_schema(data_type: &DataType, name_counter: &mut i32) -> PolarsResult<AvroSchema> {
Ok(match data_type.to_logical_type() {
DataType::Null => AvroSchema::Null,
DataType::Boolean => AvroSchema::Boolean,
Expand All @@ -68,7 +68,7 @@ fn _type_to_schema(data_type: &DataType, name_counter: &mut i32) -> Result<AvroS
fields
.iter()
.map(|f| field_to_field(f, name_counter))
.collect::<Result<Vec<_>>>()?,
.collect::<PolarsResult<Vec<_>>>()?,
)),
DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),
DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)),
Expand All @@ -86,6 +86,6 @@ fn _type_to_schema(data_type: &DataType, name_counter: &mut i32) -> Result<AvroS
},
DataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)),
DataType::Decimal(p, s) => AvroSchema::Bytes(Some(BytesLogical::Decimal(*p, *s))),
other => return Err(Error::NotYetImplemented(format!("write {other:?} to avro"))),
other => polars_bail!(nyi = "write {other:?} to avro")
})
}
Loading

0 comments on commit 6d95b29

Please sign in to comment.