Skip to content

Commit

Permalink
add channel
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Sep 5, 2023
1 parent bcb2baf commit 584d925
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/file_format/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use futures::{ready, StreamExt};
use object_store::path::Path;
use object_store::{MultipartId, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::task::{self, JoinHandle, JoinSet};
use tokio::sync::mpsc;
use tokio::task::{JoinHandle, JoinSet};

/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores.
/// It is specifically designed for the `object_store` crate's `put` method and sends
Expand Down Expand Up @@ -319,7 +320,7 @@ async fn serialize_rb_stream_to_object_store(
),
> {
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100); // buffer size of 100, adjust as needed
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100);

let serialize_task = tokio::spawn(async move {
while let Some(maybe_batch) = data_stream.next().await {
Expand All @@ -336,7 +337,6 @@ async fn serialize_rb_stream_to_object_store(
"Unknown error writing to object store".into(),
)
})?;
yield_now().await;
}
Err(_) => {
return Err(DataFusionError::Internal(
Expand Down Expand Up @@ -393,7 +393,7 @@ async fn serialize_rb_stream_to_object_store(
};
Ok((serializer, writer, row_count as u64))
}
}


/// Contains the common logic for serializing RecordBatches and
/// writing the resulting bytes to an ObjectStore.
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/tests/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ mod unix_test {

/// It tests the INSERT INTO functionality.
#[tokio::test]
#[ignore]
async fn test_sql_insert_into_fifo() -> Result<()> {
// To make unbounded deterministic
let waiting = Arc::new(AtomicBool::new(true));
Expand Down

0 comments on commit 584d925

Please sign in to comment.