-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelize Stateless (CSV/JSON) File Write Serialization #7452
Conversation
} | ||
false => { | ||
// TODO if we encounter an error during shutdown, delete previously written files? | ||
writer.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the trickiest case to ensure the write is atomic. Suppose we have two writers A and B. Writer A could successfully commit and shutdown. Then, before Writer B can complete, a network or hardware fault could prevent Writer B from either finalizing or Writer A from Aborting.
For this to be atomic, we would need some way to simultaneously commit all or none of our multipart writers. I don't think ObjectStores (S3 ect) support a way to do that.
Downstream table providers could make this atomic in practice via an atomic metadata operation, which is I believe how DeltaLake and friends work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that if someone wants atomic commit/rollback they should build that in at a higher level than datafusion -- there isn't much we can do with just the object store API
} | ||
}, | ||
Err(_) => { | ||
// Don't panic, instead try to clean up as many writers as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Join errors (perhaps because a thread was killed by the OS?) could also result in non atomic writes, since we would have no way to recover ownership of the writer and abort it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should return the error after aborting the writer, in case an execution error occurs within the plans.
datafusion/core/tests/fifo.rs
Outdated
@@ -336,6 +336,7 @@ mod unix_test { | |||
|
|||
/// It tests the INSERT INTO functionality. | |||
#[tokio::test] | |||
#[ignore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is deadlocking. I think this has to do with how this test is spawning threads.
Based on the other tests passing, I don't believe this PR has broken anything with FIFO tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mustafasrepo Any idea how to fix this test for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am currently checking for any potential problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not the way the threads are started that's the issue, but rather the assumption that if a FIFO file receives 10 rows and the batch size is also 10, it will create a record batch. The current implementation of serialize_rb_stream_to_object_store
requires access to all data before it can proceed.
while let Some(maybe_batch) = data_stream.next().await {
let mut serializer_clone = match serializer.duplicate() {
Ok(s) => s,
Err(_) => {
return Err((
writer,
DataFusionError::Internal(
"Unknown error writing to object store".into(),
),
))
}
};
serialize_tasks.push(task::spawn(async move {
let batch = maybe_batch?;
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch).await?;
Ok((num_rows, bytes))
}));
}
@alamb Any tips on how to benchmark this more scientifically? Would adding "insert into" and "copy" queries to our existing benchmarking framework be a reasonable idea? |
async fn serialize_rb_stream_to_object_store( | ||
mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>, | ||
mut serializer: Box<dyn BatchSerializer>, | ||
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | ||
) -> std::result::Result< | ||
( | ||
Box<dyn BatchSerializer>, | ||
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | ||
u64, | ||
), | ||
( | ||
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | ||
DataFusionError, | ||
), | ||
> { | ||
let mut row_count = 0; | ||
// Not using JoinSet here since we want to ulimately write to ObjectStore preserving file order | ||
let mut serialize_tasks: Vec<JoinHandle<Result<(usize, Bytes), DataFusionError>>> = | ||
Vec::new(); | ||
while let Some(maybe_batch) = data_stream.next().await { | ||
let mut serializer_clone = match serializer.duplicate() { | ||
Ok(s) => s, | ||
Err(_) => { | ||
return Err(( | ||
writer, | ||
DataFusionError::Internal( | ||
"Unknown error writing to object store".into(), | ||
), | ||
)) | ||
} | ||
}; | ||
serialize_tasks.push(task::spawn(async move { | ||
let batch = maybe_batch?; | ||
let num_rows = batch.num_rows(); | ||
let bytes = serializer_clone.serialize(batch).await?; | ||
Ok((num_rows, bytes)) | ||
})); | ||
} | ||
for serialize_result in serialize_tasks { | ||
let result = serialize_result.await; | ||
match result { | ||
Ok(res) => { | ||
let (cnt, bytes) = match res { | ||
Ok(r) => r, | ||
Err(e) => return Err((writer, e)), | ||
}; | ||
row_count += cnt; | ||
match writer.write_all(&bytes).await { | ||
Ok(_) => (), | ||
Err(_) => { | ||
return Err(( | ||
writer, | ||
DataFusionError::Internal( | ||
"Unknown error writing to object store".into(), | ||
), | ||
)) | ||
} | ||
}; | ||
} | ||
Err(_) => { | ||
return Err(( | ||
writer, | ||
DataFusionError::Internal( | ||
"Unknown error writing to object store".into(), | ||
), | ||
)) | ||
} | ||
} | ||
} | ||
|
||
Ok((serializer, writer, row_count as u64)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn serialize_rb_stream_to_object_store( | |
mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>, | |
mut serializer: Box<dyn BatchSerializer>, | |
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | |
) -> std::result::Result< | |
( | |
Box<dyn BatchSerializer>, | |
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | |
u64, | |
), | |
( | |
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | |
DataFusionError, | |
), | |
> { | |
let mut row_count = 0; | |
// Not using JoinSet here since we want to ulimately write to ObjectStore preserving file order | |
let mut serialize_tasks: Vec<JoinHandle<Result<(usize, Bytes), DataFusionError>>> = | |
Vec::new(); | |
while let Some(maybe_batch) = data_stream.next().await { | |
let mut serializer_clone = match serializer.duplicate() { | |
Ok(s) => s, | |
Err(_) => { | |
return Err(( | |
writer, | |
DataFusionError::Internal( | |
"Unknown error writing to object store".into(), | |
), | |
)) | |
} | |
}; | |
serialize_tasks.push(task::spawn(async move { | |
let batch = maybe_batch?; | |
let num_rows = batch.num_rows(); | |
let bytes = serializer_clone.serialize(batch).await?; | |
Ok((num_rows, bytes)) | |
})); | |
} | |
for serialize_result in serialize_tasks { | |
let result = serialize_result.await; | |
match result { | |
Ok(res) => { | |
let (cnt, bytes) = match res { | |
Ok(r) => r, | |
Err(e) => return Err((writer, e)), | |
}; | |
row_count += cnt; | |
match writer.write_all(&bytes).await { | |
Ok(_) => (), | |
Err(_) => { | |
return Err(( | |
writer, | |
DataFusionError::Internal( | |
"Unknown error writing to object store".into(), | |
), | |
)) | |
} | |
}; | |
} | |
Err(_) => { | |
return Err(( | |
writer, | |
DataFusionError::Internal( | |
"Unknown error writing to object store".into(), | |
), | |
)) | |
} | |
} | |
} | |
Ok((serializer, writer, row_count as u64)) | |
} | |
async fn serialize_rb_stream_to_object_store( | |
mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>, | |
mut serializer: Box<dyn BatchSerializer>, | |
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | |
) -> std::result::Result< | |
( | |
Box<dyn BatchSerializer>, | |
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | |
u64, | |
), | |
( | |
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | |
DataFusionError, | |
), | |
> { | |
let (tx, mut rx) = | |
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100); // buffer size of 100, adjust as needed | |
let serialize_task = tokio::spawn(async move { | |
while let Some(maybe_batch) = data_stream.next().await { | |
match serializer.duplicate() { | |
Ok(mut serializer_clone) => { | |
let handle = tokio::spawn(async move { | |
let batch = maybe_batch?; | |
let num_rows = batch.num_rows(); | |
let bytes = serializer_clone.serialize(batch).await?; | |
Ok((num_rows, bytes)) | |
}); | |
tx.send(handle).await.map_err(|_| { | |
DataFusionError::Internal( | |
"Unknown error writing to object store".into(), | |
) | |
})?; | |
yield_now().await; | |
} | |
Err(_) => { | |
return Err(DataFusionError::Internal( | |
"Unknown error writing to object store".into(), | |
)) | |
} | |
} | |
} | |
Ok(serializer) | |
}); | |
let mut row_count = 0; | |
while let Some(handle) = rx.recv().await { | |
match handle.await { | |
Ok(Ok((cnt, bytes))) => { | |
match writer.write_all(&bytes).await { | |
Ok(_) => (), | |
Err(_) => { | |
return Err(( | |
writer, | |
DataFusionError::Internal( | |
"Unknown error writing to object store".into(), | |
), | |
)) | |
} | |
}; | |
row_count += cnt; | |
} | |
Ok(Err(e)) => { | |
// Return the writer along with the error | |
return Err((writer, e)); | |
} | |
Err(_) => { | |
// Handle task panic or cancellation | |
return Err(( | |
writer, | |
DataFusionError::Internal( | |
"Serialization task panicked or was cancelled".into(), | |
), | |
)); | |
} | |
} | |
} | |
let serializer = match serialize_task.await { | |
Ok(Ok(serializer)) => serializer, | |
Ok(Err(e)) => return Err((writer, e)), | |
Err(_) => { | |
return Err(( | |
writer, | |
DataFusionError::Internal("Unknown error writing to object store".into()), | |
)) | |
} | |
}; | |
Ok((serializer, writer, row_count as u64)) | |
} | |
} |
How about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fantastic, thank you @metesynnada ! I tested this and with just a small change it resulted in identical performance with much lower memory usage due to the backpressure introduced by the channel. The FIFO tests is also passing now, so win win!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@metesynnada Actually, I just realized that the fifo test passing is dependent on leaving the yield_now().await
call in. That call results in the serialize tasks being run in sequence rather than parallel though, so performance is poor. I'm not quite sure how to get the fifio test to pass without the yield_now().await
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, is your profiling runtime multi-threaded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the issue links. I tried calling yield_now().await every N iterations, but the FIFO insert test only passes when N==1 (i.e. yield_now().await on every iteration).
I just pushed up a possible option of calling yield_now() if the input is unbounded, otherwise skipping it. That allows bounded tables to benefit fully from the parallelization but still allowing fifo tests to pass as they currently do.
The fifo test is not a multithreaded runtime, but when I benchmark performance I am running in a multithreaded runtime, yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will check the entire pull request tomorrow. Thank you for your hard work. I am also pleased that the new mechanism has contributed to the performance.
Co-authored-by: Metehan Yıldırım <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @devinjdangelo -- I plan to study this PR carefully tomorrow
} | ||
false => { | ||
// TODO if we encounter an error during shutdown, delete previously written files? | ||
writer.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that if someone wants atomic commit/rollback they should build that in at a higher level than datafusion -- there isn't much we can do with just the object store API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR looks great overall. Once the outstanding issues are resolved, I can give it a final review.
// tracks if any errors were encountered in the process of aborting writers. | ||
// if true, we may not have a guarentee that all written data was cleaned up. | ||
let mut any_abort_errors = false; | ||
match single_file_output { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be if single_file_output { ... } else { ... }
|
||
// Finalize or abort writers as appropriate | ||
for mut writer in finished_writers.into_iter() { | ||
match any_errors { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be if any_errors { ... } else { ... }
}; | ||
Ok((serializer, writer, row_count as u64)) | ||
} | ||
|
||
/// Contains the common logic for serializing RecordBatches and | ||
/// writing the resulting bytes to an ObjectStore. | ||
/// Serialization is assumed to be stateless, i.e. | ||
/// each RecordBatch can be serialized without any | ||
/// dependency on the RecordBatches before or after. | ||
pub(crate) async fn stateless_serialize_and_write_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The blocks inside the match serialize_rb_stream_to_object_store
can be refactored into separate functions for better readability and maintainability.
pub(crate) async fn stateless_serialize_and_write_files(
data: Vec<SendableRecordBatchStream>,
mut serializers: Vec<Box<dyn BatchSerializer>>,
mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
single_file_output: bool,
unbounded_input: bool,
) -> Result<u64> {
if single_file_output {
if serializers.len() != 1 || writers.len() != 1 {
return internal_err!("single_file_output is true, but got more than 1 writer!");
}
return serialize_single_file(data, serializers.remove(0), writers.remove(0), unbounded_input).await;
} else {
if data.len() != writers.len() {
return internal_err!("Did not get 1 writer for each output partition!");
}
return serialize_multiple_files(data, serializers, writers, unbounded_input).await;
}
}
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | ||
unbounded_input: bool, | ||
) -> std::result::Result< | ||
( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function signature is quite complex with nested types. Using type aliases can simplify the signature and make it more readable. Like
type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
type ResultTuple = (Box<dyn BatchSerializer>, WriterType, u64);
type ErrorTuple = (WriterType, DataFusionError);
Ok((num_rows, bytes)) | ||
}); | ||
tx.send(handle).await.map_err(|_| { | ||
DataFusionError::Internal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define the return error as a closure at the beginning of the method to make it common.
} | ||
}, | ||
Err(_) => { | ||
// Don't panic, instead try to clean up as many writers as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should return the error after aborting the writer, in case an execution error occurs within the plans.
@@ -137,8 +137,15 @@ impl TableProviderFactory for ListingTableFactory { | |||
let mut statement_options = StatementOptions::from(&cmd.options); | |||
|
|||
// Extract ListingTable specific options if present or set default | |||
// Discard unbounded option if present | |||
statement_options.take_str_option("unbounded"); | |||
let unbounded = if infinite_source { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove this logic since we support unbounded tables with CREATE UNBOUNDED EXTERNAL TABLE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That syntax is supported, but the test for FIFO insert passes unbounded as a string option. The logic here was added to handle either case. Create unbounded syntax takes precedence, but if it is not present we check for OPTION (unbounded true).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I think it is a leftover. I guess we can change the test into Create Unbounded External if I am not missing anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is OK if both methods are supported. That would provide backwards compatibility for anyone who used this syntax in past versions.
@@ -213,7 +220,8 @@ impl TableProviderFactory for ListingTableFactory { | |||
.with_file_sort_order(cmd.order_exprs.clone()) | |||
.with_insert_mode(insert_mode) | |||
.with_single_file(single_file) | |||
.with_write_options(file_type_writer_options); | |||
.with_write_options(file_type_writer_options) | |||
.with_infinite_source(unbounded); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the line with .with_infinite_source(unbounded);
as it duplicates the logic above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took the liberty of doing so in e6f1b2f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty compelling @devinjdangelo 👍 Thank you so much -- this is a great feature for DataFusion. I can't wait to see parallelized parquet writes :)
I ran some performance experiments locally and it shows the same performance you report in the PR description.
I think there are a few suggestions from @metesynnada that would make the code in this PR easier to read, but I think they could potentially be done as follow ons.
Performance Results (at least 4x faster)
This branch:
❯ copy 'traces' to 'foo.csv';
+---------+
| count |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 2.882 seconds.
Main
❯ copy 'traces' to 'foo.csv';
+---------+
| count |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 11.532 seconds.
@@ -213,7 +220,8 @@ impl TableProviderFactory for ListingTableFactory { | |||
.with_file_sort_order(cmd.order_exprs.clone()) | |||
.with_insert_mode(insert_mode) | |||
.with_single_file(single_file) | |||
.with_write_options(file_type_writer_options); | |||
.with_write_options(file_type_writer_options) | |||
.with_infinite_source(unbounded); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took the liberty of doing so in e6f1b2f
/// concurrently. Data order is preserved. In the event of an error, | ||
/// the ObjectStore writer is returned to the caller in addition to an error, | ||
/// so that the caller may handle aborting failed writes. | ||
async fn serialize_rb_stream_to_object_store( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some ideas how to simplify this code, which I will try out shortly, but I also think it can be merged like this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to rewrite this as a futures::stream
computation using buffered
-- but I got stuck on some "higher-ranked lifetime error" so I think this is about as good as it is going to get
From my point of view this PR is ready to go -- There are still some suggestion from @metesynnada, but I am not sure if they feel any are required prior to merge. |
/// concurrently. Data order is preserved. In the event of an error, | ||
/// the ObjectStore writer is returned to the caller in addition to an error, | ||
/// so that the caller may handle aborting failed writes. | ||
async fn serialize_rb_stream_to_object_store( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to rewrite this as a futures::stream
computation using buffered
-- but I got stuck on some "higher-ranked lifetime error" so I think this is about as good as it is going to get
Co-authored-by: Andrew Lamb <[email protected]>
It would be great to add "write" / "copy" queries into our benchmarks. Since DataFusion only recently started supporting such statements we don't have same infrastructure for benchmarking that we do for queries |
I think all comments have now been addressed. Thank you very much @devinjdangelo and @metesynnada -- this is really cool to see. 🚀 |
Which issue does this PR close?
Part of #7079
Rationale for this change
Serialization of "stateless" file types (where the serialized bytes of each record batch has no dependency on the serialized bytes of any other record batch), can be parallelized efficiently across all available CPU cores for a significant decrease in the time needed to write out the file.
There is a tradeoff between write speed and memory utilization. If the ObjectStore writer cannot keep up with the data being serialized, bytes could accumulate in memory. ObjectStore puts are concurrent but not parallelized so the risk of higher memory usage increases as the number of cores in the system increases. We could potentially bound this increase in memory by throttling serialization tasks when more than X RecordBatches have been serialized but not yet written to the ObjectStore (this is done!).
What changes are included in this PR?
Spawn a tokio task to serialize each each record batch in parallel for "stateless" file types, concurrently write the serialized bytes to an ObjectStore.
Benchmarking
To benchmark the performance difference in this PR, the following script is used.
cd datafusion/benchmarks ./bench.sh data tpch10
Tables below report results on a 16c/32t development server with 128Gb of memory writing to fast local nvme storage. This is an ideal case, scaling will not be as good on systems with fewer threads, significantly less memory, or slower/remote storage.
Execution Time in Seconds
Peak Memory Usage in GB
After incorporating the channel method written by @metesynnada to introduce backpressure, memory usage increase is consistently ~200MB is all tests with no noticeable drop in performance.
Note: Peak memory usage figure subtracts the peak memory usage required to cache the DataFrame and not perform any writes.
On a laptop with 15t, I see 28.42s and 42.94s for the 4CSV and 4JSON file tests respectively.
Without the caching step, each write only takes about 1 extra second.
What about Parquet?
Serializing a parquet file is more complex (the parquet writer maintains internal state across record batches).. Serializing two different parquet files in parallel is trivial, but parallelizing serialization of a single large parquet file is more difficult. It may be possible by constructing multiple row groups in parallel. I intend to work on this in another PR.
Are these changes tested?
Yes by existing tests. More tests needed to verify abort behavior may be desired.
Are there any user-facing changes?
No, just faster writes!