Skip to content

Commit

Permalink
fix: sink_csv deadlock (#13239)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Dec 24, 2023
1 parent ce09a43 commit 4046c73
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 43 deletions.
17 changes: 16 additions & 1 deletion crates/polars-io/src/csv/write.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use polars_core::POOL;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -30,6 +31,7 @@ pub struct CsvWriter<W: Write> {
header: bool,
bom: bool,
batch_size: usize,
n_threads: usize,
}

impl<W> SerWriter<W> for CsvWriter<W>
Expand All @@ -49,6 +51,7 @@ where
header: true,
bom: false,
batch_size: 1024,
n_threads: POOL.current_num_threads(),
}
}

Expand All @@ -60,7 +63,13 @@ where
if self.header {
write_impl::write_header(&mut self.buffer, &names, &self.options)?;
}
write_impl::write(&mut self.buffer, df, self.batch_size, &self.options)
write_impl::write(
&mut self.buffer,
df,
self.batch_size,
&self.options,
self.n_threads,
)
}
}

Expand Down Expand Up @@ -149,6 +158,11 @@ where
self
}

pub fn n_threads(mut self, n_threads: usize) -> Self {
self.n_threads = n_threads;
self
}

pub fn batched(self, _schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
let expects_bom = self.bom;
let expects_header = self.header;
Expand Down Expand Up @@ -188,6 +202,7 @@ impl<W: Write> BatchedWriter<W> {
df,
self.writer.batch_size,
&self.writer.options,
self.writer.n_threads,
)?;
Ok(())
}
Expand Down
16 changes: 11 additions & 5 deletions crates/polars-io/src/csv/write_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ pub(crate) fn write<W: Write>(
df: &DataFrame,
chunk_size: usize,
options: &SerializeOptions,
n_threads: usize,
) -> PolarsResult<()> {
for s in df.get_columns() {
let nested = match s.dtype() {
Expand Down Expand Up @@ -379,7 +380,6 @@ pub(crate) fn write<W: Write>(
let time_zones = time_zones.into_iter().collect::<Vec<_>>();

let len = df.height();
let n_threads = POOL.current_num_threads();
let total_rows_per_pool_iter = n_threads * chunk_size;
let any_value_iter_pool = LowContentionPool::<Vec<_>>::new(n_threads);
let write_buffer_pool = LowContentionPool::<Vec<_>>::new(n_threads);
Expand All @@ -388,8 +388,9 @@ pub(crate) fn write<W: Write>(

// holds the buffers that will be written
let mut result_buf: Vec<PolarsResult<Vec<u8>>> = Vec::with_capacity(n_threads);

while n_rows_finished < len {
let par_iter = (0..n_threads).into_par_iter().map(|thread_no| {
let buf_writer = |thread_no| {
let thread_offset = thread_no * chunk_size;
let total_offset = n_rows_finished + thread_offset;
let mut df = df.slice(total_offset as i64, chunk_size);
Expand Down Expand Up @@ -453,10 +454,15 @@ pub(crate) fn write<W: Write>(
any_value_iter_pool.set(col_iters);

Ok(write_buffer)
});
};

// rayon will ensure the right order
POOL.install(|| result_buf.par_extend(par_iter));
if n_threads > 1 {
let par_iter = (0..n_threads).into_par_iter().map(buf_writer);
// rayon will ensure the right order
POOL.install(|| result_buf.par_extend(par_iter));
} else {
result_buf.push(buf_writer(0));
}

for buf in result_buf.drain(..) {
let mut buf = buf?;
Expand Down
66 changes: 29 additions & 37 deletions crates/polars-pipe/src/executors/sinks/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::any::Any;
use std::path::Path;
use std::sync::Mutex;
use std::thread::JoinHandle;

use crossbeam_channel::{bounded, Receiver, Sender};
Expand Down Expand Up @@ -55,6 +54,17 @@ impl<W: std::io::Write> SinkWriter for polars_io::ipc::BatchedWriter<W> {
}
}

#[cfg(feature = "csv")]
impl SinkWriter for polars_io::csv::BatchedWriter<std::fs::File> {
fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
self.write_batch(df)
}

fn _finish(&mut self) -> PolarsResult<()> {
Ok(())
}
}

#[cfg(feature = "json")]
impl SinkWriter for BatchedWriter<std::fs::File> {
fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
Expand Down Expand Up @@ -219,14 +229,11 @@ impl IpcCloudSink {
}

#[cfg(feature = "csv")]
#[derive(Clone)]
pub struct CsvSink {
writer: Arc<Mutex<polars_io::csv::BatchedWriter<std::fs::File>>>,
}
pub struct CsvSink {}
#[cfg(feature = "csv")]
impl CsvSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(path: &Path, options: CsvWriterOptions, schema: &Schema) -> PolarsResult<Self> {
pub fn new(path: &Path, options: CsvWriterOptions, schema: &Schema) -> PolarsResult<FilesSink> {
let file = std::fs::File::create(path)?;
let writer = CsvWriter::new(file)
.include_bom(options.include_bom)
Expand All @@ -241,41 +248,26 @@ impl CsvSink {
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null)
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1)
.batched(schema)?;

Ok(Self {
writer: Arc::new(Mutex::new(writer)),
})
}
}

// Csv has a sync implementation because it writes in parallel. The file sink would deadlock.
#[cfg(feature = "csv")]
impl Sink for CsvSink {
fn sink(&mut self, _: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult> {
let mut writer = self.writer.lock().unwrap();
writer.write_batch(&chunk.data)?;
Ok(SinkResult::CanHaveMoreInput)
}

fn combine(&mut self, _other: &mut dyn Sink) {
// already synchronized
}

fn split(&self, _thread_no: usize) -> Box<dyn Sink> {
Box::new(self.clone())
}
let writer = Box::new(writer) as Box<dyn SinkWriter + Send + Sync>;

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
Ok(FinalizedSink::Finished(Default::default()))
}
let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

fn as_any(&mut self) -> &mut dyn Any {
self
}
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
options.maintain_order,
morsels_per_sink,
)));

fn fmt(&self) -> &str {
"csv_sink"
Ok(FilesSink {
sender,
io_thread_handle,
})
}
}

Expand Down Expand Up @@ -417,6 +409,6 @@ impl Sink for FilesSink {
self
}
fn fmt(&self) -> &str {
"file_sink"
"parquet_sink"
}
}

0 comments on commit 4046c73

Please sign in to comment.