Skip to content

Commit

Permalink
Ensure the CloudWriter finishes its job after writers
Browse files Browse the repository at this point in the history
The idea is to avoid depending on shutting down only on "Drop". This is
also important because we need to verify if the file was created.
  • Loading branch information
philss committed Jul 22, 2024
1 parent 142155b commit 739bc82
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
59 changes: 52 additions & 7 deletions native/explorer/src/cloud_writer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use crate::ExplorerError;
use object_store::path::Path;
use object_store::ObjectStore;
use object_store::{ObjectMeta, ObjectStore};
use std::sync::Arc;

use object_store::buffered::BufWriter as OSBufWriter;
use tokio::io::AsyncWriteExt;

#[derive(Debug, PartialEq)]
enum CloudWriterStatus {
Running,
Stopped,
Aborted,
}
/// CloudWriter wraps the asynchronous interface of [ObjectStore's BufWriter](https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html)
/// in a synchronous interface which implements `std::io::Write`.
///
Expand All @@ -15,6 +22,12 @@ pub struct CloudWriter {
runtime: tokio::runtime::Runtime,
// Internal writer, constructed at creation
writer: OSBufWriter,
// The copy of the object_store
object_store: Arc<dyn ObjectStore>,
// Keep the path for the file, so we can use to read head.
path: Path,
// Private status of the current writer
status: CloudWriterStatus,
}

impl CloudWriter {
Expand All @@ -27,8 +40,34 @@ impl CloudWriter {
.enable_time()
.enable_io()
.build()?;
let writer = OSBufWriter::new(object_store, path);
Ok(CloudWriter { writer, runtime })
let writer = OSBufWriter::new(object_store.clone(), path.clone());

Ok(CloudWriter {
writer,
runtime,
object_store,
path,
status: CloudWriterStatus::Running,
})
}

/// Make a head request to check if the upload has finished.
pub fn finish(&mut self) -> Result<ObjectMeta, ExplorerError> {
if self.status != CloudWriterStatus::Stopped {
self.status = CloudWriterStatus::Stopped;
let _ = self.runtime.block_on(self.writer.shutdown());
self.runtime
.block_on(self.object_store.head(&self.path))
.map_err(|err| {
ExplorerError::Other(format!(
"cannot read information from file, which means the upload failed. {err}"
))
})
} else {
Err(ExplorerError::Other(
"cannot finish cloud writer due to an error, or it was already finished.".into(),
))
}
}
}

Expand All @@ -41,11 +80,13 @@ impl std::io::Write for CloudWriter {
let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };

self.runtime.block_on(async {
let res = self.writer.write_all(buf).await;
// TODO: use writer.put to avoid copying data
let res = self.writer.write(buf).await;
if res.is_err() {
let _ = self.writer.abort().await;
self.status = CloudWriterStatus::Aborted;
}
Ok(buf.len())
res
})
}

Expand All @@ -54,14 +95,18 @@ impl std::io::Write for CloudWriter {
let res = self.writer.flush().await;
if res.is_err() {
let _ = self.writer.abort().await;
self.status = CloudWriterStatus::Aborted;
}
Ok(())
res
})
}
}

impl Drop for CloudWriter {
fn drop(&mut self) {
let _ = self.runtime.block_on(self.writer.shutdown());
if self.status != CloudWriterStatus::Stopped {
self.status = CloudWriterStatus::Stopped;
let _ = self.runtime.block_on(self.writer.shutdown());
}
}
}
14 changes: 14 additions & 0 deletions native/explorer/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ pub fn df_to_csv_cloud(
.include_header(include_headers)
.with_separator(delimiter)
.finish(&mut data.clone())?;

let _ = cloud_writer.finish()?;

Ok(())
}

Expand Down Expand Up @@ -244,6 +247,8 @@ pub fn df_to_parquet_cloud(
.with_compression(compression)
.finish(&mut data.clone())?;

let _ = cloud_writer.finish()?;

Ok(())
}

Expand Down Expand Up @@ -365,6 +370,9 @@ pub fn df_to_ipc_cloud(
IpcWriter::new(&mut cloud_writer)
.with_compression(compression)
.finish(&mut data.clone())?;

let _ = cloud_writer.finish()?;

Ok(())
}

Expand Down Expand Up @@ -467,6 +475,9 @@ pub fn df_to_ipc_stream_cloud(
IpcStreamWriter::new(&mut cloud_writer)
.with_compression(compression)
.finish(&mut data.clone())?;

let _ = cloud_writer.finish()?;

Ok(())
}

Expand Down Expand Up @@ -549,6 +560,9 @@ pub fn df_to_ndjson_cloud(data: ExDataFrame, ex_entry: ExS3Entry) -> Result<(),
JsonWriter::new(&mut cloud_writer)
.with_json_format(JsonFormat::JsonLines)
.finish(&mut data.clone())?;

let _ = cloud_writer.finish()?;

Ok(())
}

Expand Down
8 changes: 5 additions & 3 deletions test/explorer/data_frame/ipc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ defmodule Explorer.DataFrame.IPCTest do
end

@tag :cloud_integration
@tag :skip
test "cannot write an IPC file to S3 if bucket does not exist", %{
df: df,
s3_config: s3_config
Expand All @@ -189,8 +188,11 @@ defmodule Explorer.DataFrame.IPCTest do

assert {:error, error} = DF.to_ipc(df, path, config: s3_config)

assert error ==
RuntimeError.exception("Generic Error: Could not put multipart to path " <> key)
assert %RuntimeError{message: message} = error

assert message =~ "cannot read information from file, which means the upload failed."
assert message =~ "Object at location test-writes/wine"
assert message =~ "Client error with status 404 Not Found: No Body"
end
end

Expand Down

0 comments on commit 739bc82

Please sign in to comment.