Skip to content

Commit

Permalink
refactor(rust): Remove ad-hoc buffer pool (#19553)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Nov 1, 2024
1 parent aaf0a11 commit c3c38a9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 62 deletions.
46 changes: 20 additions & 26 deletions crates/polars-io/src/csv/write/write_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use arrow::legacy::time_zone::Tz;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_error::polars_ensure;
use polars_utils::contention_pool::LowContentionPool;
use rayon::prelude::*;
use serializer::{serializer_for, string_serializer};

Expand Down Expand Up @@ -115,16 +114,12 @@ pub(crate) fn write<W: Write>(

let len = df.height();
let total_rows_per_pool_iter = n_threads * chunk_size;
let serializer_pool = LowContentionPool::<Vec<_>>::new(n_threads);
let write_buffer_pool = LowContentionPool::<Vec<_>>::new(n_threads);

let mut n_rows_finished = 0;

// holds the buffers that will be written
let mut result_buf: Vec<PolarsResult<Vec<u8>>> = Vec::with_capacity(n_threads);

let mut buffers: Vec<_> = (0..n_threads).map(|_| (Vec::new(), Vec::new())).collect();
while n_rows_finished < len {
let buf_writer = |thread_no| {
let buf_writer = |thread_no, write_buffer: &mut Vec<_>, serializers_vec: &mut Vec<_>| {
let thread_offset = thread_no * chunk_size;
let total_offset = n_rows_finished + thread_offset;
let mut df = df.slice(total_offset as i64, chunk_size);
Expand All @@ -141,15 +136,13 @@ pub(crate) fn write<W: Write>(
// the vectors the buffer pool, the series have already been removed from the buffers
// in other words, the lifetime does not leave this scope
let cols = unsafe { std::mem::transmute::<&[Column], &[Column]>(cols) };
let mut write_buffer = write_buffer_pool.get();

if df.is_empty() {
return Ok(write_buffer);
return Ok(());
}

let mut serializers_vec = serializer_pool.get();
if serializers_vec.is_empty() {
serializers_vec = cols
*serializers_vec = cols
.iter()
.enumerate()
.map(|(i, col)| {
Expand All @@ -164,7 +157,7 @@ pub(crate) fn write<W: Write>(
.collect::<Result<_, _>>()?;
} else {
debug_assert_eq!(serializers_vec.len(), cols.len());
for (col_iter, col) in std::iter::zip(&mut serializers_vec, cols) {
for (col_iter, col) in std::iter::zip(serializers_vec.iter_mut(), cols) {
col_iter.update_array(&*col.as_materialized_series().chunks()[0]);
}
}
Expand All @@ -174,33 +167,34 @@ pub(crate) fn write<W: Write>(
let len = std::cmp::min(cols[0].len(), chunk_size);

for _ in 0..len {
serializers[0].serialize(&mut write_buffer, options);
serializers[0].serialize(write_buffer, options);
for serializer in &mut serializers[1..] {
write_buffer.push(options.separator);
serializer.serialize(&mut write_buffer, options);
serializer.serialize(write_buffer, options);
}

write_buffer.extend_from_slice(options.line_terminator.as_bytes());
}

serializer_pool.set(serializers_vec);

Ok(write_buffer)
Ok(())
};

if n_threads > 1 {
let par_iter = (0..n_threads).into_par_iter().map(buf_writer);
// rayon will ensure the right order
POOL.install(|| result_buf.par_extend(par_iter));
POOL.install(|| {
buffers
.par_iter_mut()
.enumerate()
.map(|(i, (w, s))| buf_writer(i, w, s))
.collect::<PolarsResult<()>>()
})?;
} else {
result_buf.push(buf_writer(0));
let (w, s) = &mut buffers[0];
buf_writer(0, w, s)?;
}

for buf in result_buf.drain(..) {
let mut buf = buf?;
writer.write_all(&buf)?;
buf.clear();
write_buffer_pool.set(buf);
for (write_buffer, _) in &mut buffers {
writer.write_all(write_buffer)?;
write_buffer.clear();
}

n_rows_finished += total_rows_per_pool_iter;
Expand Down
35 changes: 0 additions & 35 deletions crates/polars-utils/src/contention_pool.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub mod cache;
pub mod cardinality_sketch;
pub mod cell;
pub mod clmul;
pub mod contention_pool;
pub mod cpuid;
mod error;
pub mod floor_divmod;
Expand Down

0 comments on commit c3c38a9

Please sign in to comment.