diff --git a/native/explorer/src/cloud_writer.rs b/native/explorer/src/cloud_writer.rs index 509633e8c..fecced8c8 100644 --- a/native/explorer/src/cloud_writer.rs +++ b/native/explorer/src/cloud_writer.rs @@ -1,10 +1,17 @@ use crate::ExplorerError; use object_store::path::Path; -use object_store::ObjectStore; +use object_store::{ObjectMeta, ObjectStore}; use std::sync::Arc; use object_store::buffered::BufWriter as OSBufWriter; use tokio::io::AsyncWriteExt; + +#[derive(Debug, PartialEq)] +enum CloudWriterStatus { + Running, + Stopped, + Aborted, +} /// CloudWriter wraps the asynchronous interface of [ObjectStore's BufWriter](https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html) /// in a synchronous interface which implements `std::io::Write`. /// @@ -15,6 +22,12 @@ pub struct CloudWriter { runtime: tokio::runtime::Runtime, // Internal writer, constructed at creation writer: OSBufWriter, + // The copy of the object_store + object_store: Arc, + // Keep the path for the file, so we can use to read head. + path: Path, + // Private status of the current writer + status: CloudWriterStatus, } impl CloudWriter { @@ -27,8 +40,34 @@ impl CloudWriter { .enable_time() .enable_io() .build()?; - let writer = OSBufWriter::new(object_store, path); - Ok(CloudWriter { writer, runtime }) + let writer = OSBufWriter::new(object_store.clone(), path.clone()); + + Ok(CloudWriter { + writer, + runtime, + object_store, + path, + status: CloudWriterStatus::Running, + }) + } + + /// Make a head request to check if the upload has finished. + pub fn finish(&mut self) -> Result { + if self.status != CloudWriterStatus::Stopped { + self.status = CloudWriterStatus::Stopped; + let _ = self.runtime.block_on(self.writer.shutdown()); + self.runtime + .block_on(self.object_store.head(&self.path)) + .map_err(|err| { + ExplorerError::Other(format!( + "cannot read information from file, which means the upload failed. {err}" + )) + }) + } else { + Err(ExplorerError::Other( + "cannot finish cloud writer due to an error, or it was already finished.".into(), + )) + } } } @@ -41,11 +80,13 @@ impl std::io::Write for CloudWriter { let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) }; self.runtime.block_on(async { - let res = self.writer.write_all(buf).await; + // TODO: use writer.put to avoid copying data + let res = self.writer.write(buf).await; if res.is_err() { let _ = self.writer.abort().await; + self.status = CloudWriterStatus::Aborted; } - Ok(buf.len()) + res }) } @@ -54,14 +95,18 @@ impl std::io::Write for CloudWriter { let res = self.writer.flush().await; if res.is_err() { let _ = self.writer.abort().await; + self.status = CloudWriterStatus::Aborted; } - Ok(()) + res }) } } impl Drop for CloudWriter { fn drop(&mut self) { - let _ = self.runtime.block_on(self.writer.shutdown()); + if self.status != CloudWriterStatus::Stopped { + self.status = CloudWriterStatus::Stopped; + let _ = self.runtime.block_on(self.writer.shutdown()); + } } } diff --git a/native/explorer/src/dataframe/io.rs b/native/explorer/src/dataframe/io.rs index 67b2c4fc7..676b36c68 100644 --- a/native/explorer/src/dataframe/io.rs +++ b/native/explorer/src/dataframe/io.rs @@ -113,6 +113,9 @@ pub fn df_to_csv_cloud( .include_header(include_headers) .with_separator(delimiter) .finish(&mut data.clone())?; + + let _ = cloud_writer.finish()?; + Ok(()) } @@ -244,6 +247,8 @@ pub fn df_to_parquet_cloud( .with_compression(compression) .finish(&mut data.clone())?; + let _ = cloud_writer.finish()?; + Ok(()) } @@ -365,6 +370,9 @@ pub fn df_to_ipc_cloud( IpcWriter::new(&mut cloud_writer) .with_compression(compression) .finish(&mut data.clone())?; + + let _ = cloud_writer.finish()?; + Ok(()) } @@ -467,6 +475,9 @@ pub fn df_to_ipc_stream_cloud( IpcStreamWriter::new(&mut cloud_writer) .with_compression(compression) .finish(&mut data.clone())?; + + let _ = cloud_writer.finish()?; + Ok(()) } @@ -549,6 +560,9 @@ pub fn df_to_ndjson_cloud(data: ExDataFrame, ex_entry: ExS3Entry) -> Result<(), JsonWriter::new(&mut cloud_writer) .with_json_format(JsonFormat::JsonLines) .finish(&mut data.clone())?; + + let _ = cloud_writer.finish()?; + Ok(()) } diff --git a/test/explorer/data_frame/ipc_test.exs b/test/explorer/data_frame/ipc_test.exs index d417c27fe..d4dce332c 100644 --- a/test/explorer/data_frame/ipc_test.exs +++ b/test/explorer/data_frame/ipc_test.exs @@ -179,7 +179,6 @@ defmodule Explorer.DataFrame.IPCTest do end @tag :cloud_integration - @tag :skip test "cannot write an IPC file to S3 if bucket does not exist", %{ df: df, s3_config: s3_config @@ -189,8 +188,11 @@ defmodule Explorer.DataFrame.IPCTest do assert {:error, error} = DF.to_ipc(df, path, config: s3_config) - assert error == - RuntimeError.exception("Generic Error: Could not put multipart to path " <> key) + assert %RuntimeError{message: message} = error + + assert message =~ "cannot read information from file, which means the upload failed." + assert message =~ "Object at location test-writes/wine" + assert message =~ "Client error with status 404 Not Found: No Body" end end