Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into move-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoGorelli committed May 2, 2024
2 parents d8f5940 + 414e5f6 commit e06abce
Show file tree
Hide file tree
Showing 53 changed files with 2,840 additions and 474 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint-global.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ jobs:
- name: Lint Markdown and TOML
uses: dprint/[email protected]
- name: Spell Check with Typos
uses: crate-ci/typos@v1.20.10
uses: crate-ci/typos@v1.21.0
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
.yarn/
coverage.lcov
coverage.xml
data/
polars/vendor

# OS
Expand All @@ -32,6 +31,12 @@ __pycache__/
.cargo/
target/

# Data
*.csv
*.parquet
*.feather
*.tbl

# Project
/docs/data/
/docs/images/
Expand Down
2 changes: 1 addition & 1 deletion _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ extend-glob = ["*.gz"]
check-file = false

[files]
extend-exclude = ["_typos.toml"]
extend-exclude = ["_typos.toml", "dists.dss"]
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl std::fmt::Debug for dyn Array + '_ {
match self.data_type().to_physical_type() {
Null => fmt_dyn!(self, NullArray, f),
Boolean => fmt_dyn!(self, BooleanArray, f),
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Primitive(primitive) => with_match_primitive_type_full!(primitive, |$T| {
fmt_dyn!(self, PrimitiveArray<$T>, f)
}),
BinaryView => fmt_dyn!(self, BinaryViewArray, f),
Expand Down
89 changes: 68 additions & 21 deletions crates/polars-arrow/src/mmap/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::io::ipc::read::{Dictionaries, IpcBuffer, Node, OutOfSpecKind};
use crate::io::ipc::IpcField;
use crate::offset::Offset;
use crate::types::NativeType;
use crate::{match_integer_type, with_match_primitive_type};
use crate::{match_integer_type, with_match_primitive_type_full};

fn get_buffer_bounds(buffers: &mut VecDeque<IpcBuffer>) -> PolarsResult<(usize, usize)> {
let buffer = buffers.pop_front().ok_or_else(
Expand All @@ -29,6 +29,19 @@ fn get_buffer_bounds(buffers: &mut VecDeque<IpcBuffer>) -> PolarsResult<(usize,
Ok((offset, length))
}

/// Checks that the length of `bytes` is at least `size_of::<T>() * expected_len`, and
/// returns a boolean indicating whether it is aligned.
fn check_bytes_len_and_is_aligned<T: NativeType>(
bytes: &[u8],
expected_len: usize,
) -> PolarsResult<bool> {
if bytes.len() < std::mem::size_of::<T>() * expected_len {
polars_bail!(ComputeError: "buffer's length is too small in mmap")
};

Ok(bytemuck::try_cast_slice::<_, T>(bytes).is_ok())
}

fn get_buffer<'a, T: NativeType>(
data: &'a [u8],
block_offset: usize,
Expand All @@ -42,13 +55,8 @@ fn get_buffer<'a, T: NativeType>(
.get(block_offset + offset..block_offset + offset + length)
.ok_or_else(|| polars_err!(ComputeError: "buffer out of bounds"))?;

// validate alignment
let v: &[T] = bytemuck::try_cast_slice(values)
.map_err(|_| polars_err!(ComputeError: "buffer not aligned for mmap"))?;

if v.len() < num_rows {
polars_bail!(ComputeError: "buffer's length is too small in mmap",
)
if !check_bytes_len_and_is_aligned::<T>(values, num_rows)? {
polars_bail!(ComputeError: "buffer not aligned for mmap");
}

Ok(values)
Expand Down Expand Up @@ -270,19 +278,58 @@ fn mmap_primitive<P: NativeType, T: AsRef<[u8]>>(

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

let values = get_buffer::<P>(data_ref, block_offset, buffers, num_rows)?.as_ptr();
let bytes = get_bytes(data_ref, block_offset, buffers)?;
let is_aligned = check_bytes_len_and_is_aligned::<P>(bytes, num_rows)?;

Ok(unsafe {
create_array(
data,
num_rows,
null_count,
[validity, Some(values)].into_iter(),
[].into_iter(),
None,
None,
)
})
let out = if is_aligned || std::mem::size_of::<T>() <= 8 {
assert!(
is_aligned,
"primitive type with size <= 8 bytes should have been aligned"
);
let bytes_ptr = bytes.as_ptr();

unsafe {
create_array(
data,
num_rows,
null_count,
[validity, Some(bytes_ptr)].into_iter(),
[].into_iter(),
None,
None,
)
}
} else {
let mut values = vec![P::default(); num_rows];
unsafe {
std::ptr::copy_nonoverlapping(
bytes.as_ptr(),
values.as_mut_ptr() as *mut u8,
bytes.len(),
)
};
// Now we need to keep the new buffer alive
let owned_data = Arc::new((
// We can drop the original ref if we don't have a validity
validity.and(Some(data)),
values,
));
let bytes_ptr = owned_data.1.as_ptr() as *mut u8;

unsafe {
create_array(
owned_data,
num_rows,
null_count,
[validity, Some(bytes_ptr)].into_iter(),
[].into_iter(),
None,
None,
)
}
};

Ok(out)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -482,7 +529,7 @@ fn get_array<T: AsRef<[u8]>>(
match data_type.to_physical_type() {
Null => mmap_null(data, &node, block_offset, buffers),
Boolean => mmap_boolean(data, &node, block_offset, buffers),
Primitive(p) => with_match_primitive_type!(p, |$T| {
Primitive(p) => with_match_primitive_type_full!(p, |$T| {
mmap_primitive::<$T, _>(data, &node, block_offset, buffers)
}),
Utf8 | Binary => mmap_binary::<i32, _>(data, &node, block_offset, buffers),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ impl CategoricalChunked {
self
}

pub fn _with_fast_unique(self, toggle: bool) -> Self {
self.with_fast_unique(toggle)
}

/// Get a reference to the mapping of categorical types to the string values.
pub fn get_rev_map(&self) -> &Arc<RevMapping> {
if let DataType::Categorical(Some(rev_map), _) | DataType::Enum(Some(rev_map), _) =
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-core/src/datatypes/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ impl PartialEq for DataType {
(Array(left_inner, left_width), Array(right_inner, right_width)) => {
left_width == right_width && left_inner == right_inner
},
(Unknown(l), Unknown(r)) => match (l, r) {
(UnknownKind::Int(_), UnknownKind::Int(_)) => true,
_ => l == r,
},
_ => std::mem::discriminant(self) == std::mem::discriminant(other),
}
}
Expand Down
5 changes: 0 additions & 5 deletions crates/polars-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,6 @@ impl Series {
Ok(StructChunked::new_unchecked(name, &fields).into_series())
},
ArrowDataType::FixedSizeBinary(_) => {
if verbose() {
eprintln!(
"Polars does not support decimal types so the 'Series' are read as Float64"
);
}
let chunks = cast_chunks(&chunks, &DataType::Binary, true)?;
Ok(BinaryChunked::from_chunks(name, chunks).into_series())
},
Expand Down
40 changes: 36 additions & 4 deletions crates/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ pub use series_trait::{IsSorted, *};
use crate::chunked_array::Settings;
#[cfg(feature = "zip_with")]
use crate::series::arithmetic::coerce_lhs_rhs;
use crate::utils::{_split_offsets, handle_casting_failures, split_ca, split_series, Wrap};
use crate::utils::{
_split_offsets, handle_casting_failures, materialize_dyn_int, split_ca, split_series, Wrap,
};
use crate::POOL;

/// # Series
Expand Down Expand Up @@ -309,9 +311,39 @@ impl Series {

/// Cast `[Series]` to another `[DataType]`.
pub fn cast(&self, dtype: &DataType) -> PolarsResult<Self> {
// Best leave as is.
if !dtype.is_known() || (dtype.is_primitive() && dtype == self.dtype()) {
return Ok(self.clone());
match dtype {
DataType::Unknown(kind) => {
return match kind {
// Best leave as is.
UnknownKind::Any => Ok(self.clone()),
UnknownKind::Int(v) => {
if self.dtype().is_integer() {
Ok(self.clone())
} else {
self.cast(&materialize_dyn_int(*v).dtype())
}
},
UnknownKind::Float => {
if self.dtype().is_float() {
Ok(self.clone())
} else {
self.cast(&DataType::Float64)
}
},
UnknownKind::Str => {
if self.dtype().is_string() | self.dtype().is_categorical() {
Ok(self.clone())
} else {
self.cast(&DataType::String)
}
},
};
},
// Best leave as is.
dt if dt.is_primitive() && dt == self.dtype() => {
return Ok(self.clone());
},
_ => {},
}
let ret = self.0.cast(dtype);
let len = self.len();
Expand Down
41 changes: 28 additions & 13 deletions crates/polars-core/src/utils/supertype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,28 +264,43 @@ pub fn get_supertype(l: &DataType, r: &DataType) -> Option<DataType> {
},
(dt, Unknown(kind)) => {
match kind {
// numeric vs float|str -> always float|str
UnknownKind::Float | UnknownKind::Int(_) if dt.is_float() | dt.is_string() => Some(dt.clone()),
UnknownKind::Float if dt.is_numeric() => Some(Unknown(UnknownKind::Float)),
UnknownKind::Float if dt.is_integer() => Some(Unknown(UnknownKind::Float)),
// Materialize float
UnknownKind::Float if dt.is_float() => Some(dt.clone()),
// Materialize str
UnknownKind::Str if dt.is_string() | dt.is_enum() => Some(dt.clone()),
// Materialize str
#[cfg(feature = "dtype-categorical")]
UnknownKind::Str if dt.is_categorical() => {
let Categorical(_, ord) = dt else { unreachable!()};
Some(Categorical(None, *ord))
},
// Keep unknown
dynam if dt.is_null() => Some(Unknown(*dynam)),
// Find integers sizes
UnknownKind::Int(v) if dt.is_numeric() => {
let smallest_fitting_dtype = if dt.is_unsigned_integer() && v.is_positive() {
materialize_dyn_int_pos(*v).dtype()
} else {
materialize_smallest_dyn_int(*v).dtype()
};
match dt {
UInt64 if smallest_fitting_dtype.is_signed_integer() => {
// Ensure we don't cast to float when dealing with dynamic literals
Some(Int64)
},
_ => {
get_supertype(dt, &smallest_fitting_dtype)
// Both dyn int
if let Unknown(UnknownKind::Int(v_other)) = dt {
// Take the maximum value to ensure we bubble up the required minimal size.
Some(Unknown(UnknownKind::Int(std::cmp::max(*v, *v_other))))
}
// dyn int vs number
else {
let smallest_fitting_dtype = if dt.is_unsigned_integer() && v.is_positive() {
materialize_dyn_int_pos(*v).dtype()
} else {
materialize_smallest_dyn_int(*v).dtype()
};
match dt {
UInt64 if smallest_fitting_dtype.is_signed_integer() => {
// Ensure we don't cast to float when dealing with dynamic literals
Some(Int64)
},
_ => {
get_supertype(dt, &smallest_fitting_dtype)
}
}
}
}
Expand Down
24 changes: 17 additions & 7 deletions crates/polars-io/src/csv/read/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_utils::sync::SyncPtr;
use polars_utils::IdxSize;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

Expand Down Expand Up @@ -54,6 +55,8 @@ pub(crate) fn get_offsets(
}
}

/// Reads bytes from `file` to `buf` and returns pointers into `buf` that can be parsed.
/// TODO! this can be implemented without copying by pointing in the memmapped file.
struct ChunkReader<'a> {
file: &'a File,
buf: Vec<u8>,
Expand Down Expand Up @@ -109,18 +112,23 @@ impl<'a> ChunkReader<'a> {
self.buf_end = 0;
}

fn return_slice(&self, start: usize, end: usize) -> (usize, usize) {
fn return_slice(&self, start: usize, end: usize) -> (SyncPtr<u8>, usize) {
let slice = &self.buf[start..end];
let len = slice.len();
(slice.as_ptr() as usize, len)
(slice.as_ptr().into(), len)
}

fn get_buf(&self) -> (usize, usize) {
fn get_buf_remaining(&self) -> (SyncPtr<u8>, usize) {
let slice = &self.buf[self.buf_end..];
let len = slice.len();
(slice.as_ptr() as usize, len)
(slice.as_ptr().into(), len)
}

// Get next `n` offset positions. Where `n` is number of chunks.

// This returns pointers into slices into `buf`
// we must process the slices before the next call
// as that will overwrite the slices
fn read(&mut self, n: usize) -> bool {
self.reslice();

Expand Down Expand Up @@ -267,7 +275,7 @@ pub struct BatchedCsvReaderRead<'a> {
chunk_size: usize,
finished: bool,
file_chunk_reader: ChunkReader<'a>,
file_chunks: Vec<(usize, usize)>,
file_chunks: Vec<(SyncPtr<u8>, usize)>,
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_index: Option<RowIndex>,
Expand All @@ -292,6 +300,7 @@ pub struct BatchedCsvReaderRead<'a> {
}
//
impl<'a> BatchedCsvReaderRead<'a> {
/// `n` number of batches.
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 || self.finished {
return Ok(None);
Expand Down Expand Up @@ -320,7 +329,8 @@ impl<'a> BatchedCsvReaderRead<'a> {
// ensure we process the final slice as well.
if self.file_chunk_reader.finished && self.file_chunks.len() < n {
// get the final slice
self.file_chunks.push(self.file_chunk_reader.get_buf());
self.file_chunks
.push(self.file_chunk_reader.get_buf_remaining());
self.finished = true
}

Expand All @@ -333,7 +343,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
self.file_chunks
.par_iter()
.map(|(ptr, len)| {
let chunk = unsafe { std::slice::from_raw_parts(*ptr as *const u8, *len) };
let chunk = unsafe { std::slice::from_raw_parts(ptr.get(), *len) };
let stop_at_n_bytes = chunk.len();
let mut df = read_chunk(
chunk,
Expand Down
Loading

0 comments on commit e06abce

Please sign in to comment.