Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 14, 2023
1 parent 379428b commit beeb6e6
Show file tree
Hide file tree
Showing 30 changed files with 359 additions and 371 deletions.
14 changes: 5 additions & 9 deletions crates/nano-arrow/src/compute/concatenate.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Contains the concatenate kernel
//r Contains the concatenate kernel
//!
//! Example:
//!
Expand All @@ -13,26 +13,22 @@
//! assert_eq!(arr.len(), 3);
//! ```
use polars_error::{polars_bail, PolarsResult};
use crate::array::growable::make_growable;
use crate::array::Array;
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::error::{Error, Result};

/// Concatenate multiple [Array] of the same type into a single [`Array`].
pub fn concatenate(arrays: &[&dyn Array]) -> Result<Box<dyn Array>> {
pub fn concatenate(arrays: &[&dyn Array]) -> PolarsResult<Box<dyn Array>> {
if arrays.is_empty() {
return Err(Error::InvalidArgumentError(
"concat requires input of at least one array".to_string(),
));
polars_bail!(InvalidOperation: "concat requires input of at least one array")
}

if arrays
.iter()
.any(|array| array.data_type() != arrays[0].data_type())
{
return Err(Error::InvalidArgumentError(
"It is not possible to concatenate arrays of different data types.".to_string(),
));
polars_bail!(InvalidOperation: "It is not possible to concatenate arrays of different data types.")
}

let lengths = arrays.iter().map(|array| array.len()).collect::<Vec<_>>();
Expand Down
12 changes: 6 additions & 6 deletions crates/nano-arrow/src/compute/if_then_else.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the operator [`if_then_else`].
use polars_error::{polars_bail, PolarsResult};
use crate::array::{growable, Array, BooleanArray};
use crate::bitmap::utils::SlicesIterator;
use crate::error::{Error, Result};

/// Returns the values from `lhs` if the predicate is `true` or from the `rhs` if the predicate is false
/// Returns `None` if the predicate is `None`.
Expand All @@ -27,21 +27,21 @@ pub fn if_then_else(
predicate: &BooleanArray,
lhs: &dyn Array,
rhs: &dyn Array,
) -> Result<Box<dyn Array>> {
) -> PolarsResult<Box<dyn Array>> {
if lhs.data_type() != rhs.data_type() {
return Err(Error::InvalidArgumentError(format!(
polars_bail!(InvalidOperation:
"If then else requires the arguments to have the same datatypes ({:?} != {:?})",
lhs.data_type(),
rhs.data_type()
)));
)
}
if (lhs.len() != rhs.len()) | (lhs.len() != predicate.len()) {
return Err(Error::InvalidArgumentError(format!(
polars_bail!(ComputeError:
"If then else requires all arguments to have the same length (predicate = {}, lhs = {}, rhs = {})",
predicate.len(),
lhs.len(),
rhs.len()
)));
);
}

let result = if predicate.null_count() > 0 {
Expand Down
40 changes: 16 additions & 24 deletions crates/nano-arrow/src/compute/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
//! Defines temporal kernels for time and date related functions.
use chrono::{Datelike, Timelike};
use polars_error::PolarsResult;

use super::arity::unary;
use crate::array::*;
use crate::datatypes::*;
use crate::error::{Error, Result};
use crate::temporal_conversions::*;
use crate::types::NativeType;

Expand Down Expand Up @@ -65,46 +65,42 @@ macro_rules! date_like {
chrono_tz(array, *time_unit, timezone_str, |x| x.$extract())
}
},
dt => Err(Error::NotYetImplemented(format!(
"\"{}\" does not support type {:?}",
stringify!($extract),
dt
))),
_ => unimplemented!()
}
};
}

/// Extracts the years of a temporal array as [`PrimitiveArray<i32>`].
/// Use [`can_year`] to check if this operation is supported for the target [`DataType`].
pub fn year(array: &dyn Array) -> Result<PrimitiveArray<i32>> {
pub fn year(array: &dyn Array) -> PolarsResult<PrimitiveArray<i32>> {
date_like!(year, array, DataType::Int32)
}

/// Extracts the months of a temporal array as [`PrimitiveArray<u32>`].
/// Value ranges from 1 to 12.
/// Use [`can_month`] to check if this operation is supported for the target [`DataType`].
pub fn month(array: &dyn Array) -> Result<PrimitiveArray<u32>> {
pub fn month(array: &dyn Array) -> PolarsResult<PrimitiveArray<u32>> {
date_like!(month, array, DataType::UInt32)
}

/// Extracts the days of a temporal array as [`PrimitiveArray<u32>`].
/// Value ranges from 1 to 32 (Last day depends on month).
/// Use [`can_day`] to check if this operation is supported for the target [`DataType`].
pub fn day(array: &dyn Array) -> Result<PrimitiveArray<u32>> {
pub fn day(array: &dyn Array) -> PolarsResult<PrimitiveArray<u32>> {
date_like!(day, array, DataType::UInt32)
}

/// Extracts weekday of a temporal array as [`PrimitiveArray<u32>`].
/// Monday is 1, Tuesday is 2, ..., Sunday is 7.
/// Use [`can_weekday`] to check if this operation is supported for the target [`DataType`]
pub fn weekday(array: &dyn Array) -> Result<PrimitiveArray<u32>> {
pub fn weekday(array: &dyn Array) -> PolarsResult<PrimitiveArray<u32>> {
date_like!(u32_weekday, array, DataType::UInt32)
}

/// Extracts ISO week of a temporal array as [`PrimitiveArray<u32>`]
/// Value ranges from 1 to 53 (Last week depends on the year).
/// Use [`can_iso_week`] to check if this operation is supported for the target [`DataType`]
pub fn iso_week(array: &dyn Array) -> Result<PrimitiveArray<u32>> {
pub fn iso_week(array: &dyn Array) -> PolarsResult<PrimitiveArray<u32>> {
date_like!(u32_iso_week, array, DataType::UInt32)
}

Expand All @@ -128,43 +124,39 @@ macro_rules! time_like {
chrono_tz(array, *time_unit, timezone_str, |x| x.$extract())
}
},
dt => Err(Error::NotYetImplemented(format!(
"\"{}\" does not support type {:?}",
stringify!($extract),
dt
))),
_ => unimplemented!(),
}
};
}

/// Extracts the hours of a temporal array as [`PrimitiveArray<u32>`].
/// Value ranges from 0 to 23.
/// Use [`can_hour`] to check if this operation is supported for the target [`DataType`].
pub fn hour(array: &dyn Array) -> Result<PrimitiveArray<u32>> {
pub fn hour(array: &dyn Array) -> PolarsResult<PrimitiveArray<u32>> {
time_like!(hour, array, DataType::UInt32)
}

/// Extracts the minutes of a temporal array as [`PrimitiveArray<u32>`].
/// Value ranges from 0 to 59.
/// Use [`can_minute`] to check if this operation is supported for the target [`DataType`].
pub fn minute(array: &dyn Array) -> Result<PrimitiveArray<u32>> {
pub fn minute(array: &dyn Array) -> PolarsResult<PrimitiveArray<u32>> {
time_like!(minute, array, DataType::UInt32)
}

/// Extracts the seconds of a temporal array as [`PrimitiveArray<u32>`].
/// Value ranges from 0 to 59.
/// Use [`can_second`] to check if this operation is supported for the target [`DataType`].
pub fn second(array: &dyn Array) -> Result<PrimitiveArray<u32>> {
pub fn second(array: &dyn Array) -> PolarsResult<PrimitiveArray<u32>> {
time_like!(second, array, DataType::UInt32)
}

/// Extracts the nanoseconds of a temporal array as [`PrimitiveArray<u32>`].
/// Use [`can_nanosecond`] to check if this operation is supported for the target [`DataType`].
pub fn nanosecond(array: &dyn Array) -> Result<PrimitiveArray<u32>> {
pub fn nanosecond(array: &dyn Array) -> PolarsResult<PrimitiveArray<u32>> {
time_like!(nanosecond, array, DataType::UInt32)
}

fn date_variants<F, O>(array: &dyn Array, data_type: DataType, op: F) -> Result<PrimitiveArray<O>>
fn date_variants<F, O>(array: &dyn Array, data_type: DataType, op: F) -> PolarsResult<PrimitiveArray<O>>
where
O: NativeType,
F: Fn(chrono::NaiveDateTime) -> O,
Expand Down Expand Up @@ -201,7 +193,7 @@ where
}
}

fn time_variants<F, O>(array: &dyn Array, data_type: DataType, op: F) -> Result<PrimitiveArray<O>>
fn time_variants<F, O>(array: &dyn Array, data_type: DataType, op: F) -> PolarsResult<PrimitiveArray<O>>
where
O: NativeType,
F: Fn(chrono::NaiveTime) -> O,
Expand Down Expand Up @@ -245,7 +237,7 @@ fn chrono_tz<F, O>(
time_unit: TimeUnit,
timezone_str: &str,
op: F,
) -> Result<PrimitiveArray<O>>
) -> PolarsResult<PrimitiveArray<O>>
where
O: NativeType,
F: Fn(chrono::DateTime<chrono_tz::Tz>) -> O,
Expand All @@ -260,7 +252,7 @@ fn chrono_tz<F, O>(
_: TimeUnit,
timezone_str: &str,
_: F,
) -> Result<PrimitiveArray<O>>
) -> PolarsResult<PrimitiveArray<O>>
where
O: NativeType,
F: Fn(chrono::DateTime<chrono::FixedOffset>) -> O,
Expand Down
16 changes: 7 additions & 9 deletions crates/nano-arrow/src/io/ipc/append/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
// write new batch
// write new footer
use std::io::{Read, Seek, SeekFrom, Write};
use polars_error::{polars_bail, polars_err, PolarsResult};

use super::endianness::is_native_little_endian;
use super::read::{self, FileMetadata};
use super::write::common::DictionaryTracker;
use super::write::writer::*;
use super::write::*;
use crate::error::{Error, Result};

impl<R: Read + Seek + Write> FileWriter<R> {
/// Creates a new [`FileWriter`] from an existing file, seeking to the last message
Expand All @@ -24,31 +24,29 @@ impl<R: Read + Seek + Write> FileWriter<R> {
mut writer: R,
metadata: FileMetadata,
options: WriteOptions,
) -> Result<FileWriter<R>> {
) -> PolarsResult<FileWriter<R>> {
if metadata.ipc_schema.is_little_endian != is_native_little_endian() {
return Err(Error::nyi(
"Appending to a file of a non-native endianness is still not supported",
));
polars_bail!(ComputeError: "appending to a file of a non-native endianness is not supported")
}

let dictionaries =
read::read_file_dictionaries(&mut writer, &metadata, &mut Default::default())?;

let last_block = metadata.blocks.last().ok_or_else(|| {
Error::oos("An Arrow IPC file must have at least 1 message (the schema message)")
polars_err!(oos = "an Arrow IPC file must have at least 1 message (the schema message)")
})?;
let offset: u64 = last_block
.offset
.try_into()
.map_err(|_| Error::oos("The block's offset must be a positive number"))?;
.map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
let meta_data_length: u64 = last_block
.meta_data_length
.try_into()
.map_err(|_| Error::oos("The block's meta length must be a positive number"))?;
.map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
let body_length: u64 = last_block
.body_length
.try_into()
.map_err(|_| Error::oos("The block's body length must be a positive number"))?;
.map_err(|_| polars_err!(oos = "the block's body length must be a positive number"))?;
let offset: u64 = offset + meta_data_length + body_length;

writer.seek(SeekFrom::Start(offset))?;
Expand Down
3 changes: 2 additions & 1 deletion crates/nano-arrow/src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use polars_error::to_compute_err;
use crate::error::Result;

#[cfg(feature = "io_ipc_compression")]
Expand Down Expand Up @@ -36,7 +37,7 @@ pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use crate::error::Error;
let mut encoder = lz4::EncoderBuilder::new()
.build(output_buf)
.map_err(Error::from)?;
.map_err(to_compute_err)?;
encoder.write_all(input_buf)?;
encoder.finish().1.map_err(|e| e.into())
}
Expand Down
22 changes: 11 additions & 11 deletions crates/nano-arrow/src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};
use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use crate::array::BinaryArray;
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::offset::Offset;

#[allow(clippy::too_many_arguments)]
Expand All @@ -20,11 +20,11 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> Result<BinaryArray<O>> {
) -> PolarsResult<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
polars_err!(oos =
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
))
)
})?;

let validity = read_validity(
Expand All @@ -41,7 +41,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let offsets: Buffer<O> = read_buffer(
Expand All @@ -54,7 +54,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;
.or_else(|_| PolarsResult::Ok(Buffer::<O>::from(vec![O::default()])))?;

let last_offset = offsets.last().unwrap().to_usize();
let values = read_buffer(
Expand All @@ -73,19 +73,19 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
pub fn skip_binary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<()> {
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
Error::oos("IPC: unable to fetch the field for binary. The file or stream is corrupted.")
polars_err!(oos = "IPC: unable to fetch the field for binary. The file or stream is corrupted.")
})?;

let _ = buffers
.pop_front()
.ok_or_else(|| Error::oos("IPC: missing validity buffer."))?;
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| Error::oos("IPC: missing offsets buffer."))?;
.ok_or_else(|| polars_err!(oos = "IPC: missing offsets buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| Error::oos("IPC: missing values buffer."))?;
.ok_or_else(|| polars_err!(oos = "IPC: missing values buffer."))?;
Ok(())
}
Loading

0 comments on commit beeb6e6

Please sign in to comment.