From 238d365bb45924a1fce5f5372b94ca363fd252fa Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Tue, 18 Jun 2024 23:13:54 +1000 Subject: [PATCH 1/7] Reworked/adapted streaming writes implementation. TODO: copy over tests --- src/common/mod.rs | 3 +++ src/common/stream.rs | 31 +++++++++++++++++++++++++++++++ src/lib.rs | 3 +++ src/wasm.rs | 22 ++++++++++++++++++++++ src/writer_async.rs | 38 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 97 insertions(+) create mode 100644 src/common/stream.rs create mode 100644 src/writer_async.rs diff --git a/src/common/mod.rs b/src/common/mod.rs index ca76c9cf..84798af3 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -2,3 +2,6 @@ pub mod properties; #[cfg(feature = "async")] pub mod fetch; + +#[cfg(feature = "async")] +pub mod stream; \ No newline at end of file diff --git a/src/common/stream.rs b/src/common/stream.rs new file mode 100644 index 00000000..3c056f08 --- /dev/null +++ b/src/common/stream.rs @@ -0,0 +1,31 @@ +use futures::AsyncWrite; + +pub struct WrappedWritableStream<'writer> { + pub stream: wasm_streams::writable::IntoAsyncWrite<'writer>, +} + +impl<'writer> AsyncWrite for WrappedWritableStream<'writer> { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + AsyncWrite::poll_write(std::pin::Pin::new(&mut self.get_mut().stream), cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + AsyncWrite::poll_flush(std::pin::Pin::new(&mut self.get_mut().stream), cx) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + AsyncWrite::poll_close(std::pin::Pin::new(&mut self.get_mut().stream), cx) + } +} + +unsafe impl<'writer> Send for WrappedWritableStream<'writer> {} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 035b47d7..05935e61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,9 @@ pub mod writer; #[cfg(feature = "writer")] pub mod writer_properties; +#[cfg(all(feature = "writer", feature = "async"))] +pub mod writer_async; + // When the `wee_alloc` feature is enabled, use `wee_alloc` as the global // allocator. /*#[cfg(feature = "wee_alloc")] diff --git a/src/wasm.rs b/src/wasm.rs index 136739d1..81e8a8c1 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -249,3 +249,25 @@ pub async fn read_parquet_stream( }); Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw()) } + +#[wasm_bindgen(js_name = "transformParquetStream")] +#[cfg(all(feature = "writer", feature = "async"))] +pub fn transform_parquet_stream( + stream: wasm_streams::readable::sys::ReadableStream, + writer_properties: Option, +) -> WasmResult { + use futures::StreamExt; + use wasm_bindgen::convert::TryFromJsValue; + let batches = wasm_streams::ReadableStream::from_raw(stream) + .into_stream() + .map(|maybe_chunk| { + let chunk = maybe_chunk.unwrap(); + let transformed = arrow_wasm::RecordBatch::try_from_js_value(chunk).unwrap(); + transformed + }); + let output_stream = super::writer_async::transform_parquet_stream( + batches, + writer_properties.unwrap_or_default(), + ); + Ok(output_stream.unwrap()) +} \ No newline at end of file diff --git a/src/writer_async.rs b/src/writer_async.rs new file mode 100644 index 00000000..6294cae3 --- /dev/null +++ b/src/writer_async.rs @@ -0,0 +1,38 @@ +use crate::error::Result; +use crate::common::stream::WrappedWritableStream; +use async_compat::CompatExt; +use futures::StreamExt; +use parquet::arrow::async_writer::AsyncArrowWriter; +use wasm_bindgen_futures::spawn_local; + +pub fn transform_parquet_stream( + batches: impl futures::Stream + 'static, + writer_properties: crate::writer_properties::WriterProperties, +) -> Result { + let options = Some(writer_properties.into()); + // let encoding = writer_properties.get_encoding(); + + let (writable_stream, output_stream) = { + let raw_stream = wasm_streams::transform::sys::TransformStream::new().unwrap(); + let raw_writable = raw_stream.writable(); + let inner_writer = wasm_streams::WritableStream::from_raw(raw_writable).into_async_write(); + let writable_stream = WrappedWritableStream { + stream: inner_writer, + }; + (writable_stream, raw_stream.readable()) + }; + spawn_local::<_>(async move { + let mut adapted_stream = batches.peekable(); + let mut pinned_stream = std::pin::pin!(adapted_stream); + let first_batch = pinned_stream.as_mut().peek().await.unwrap(); + let schema = first_batch.schema().into_inner(); + // Need to create an encoding for each column + let mut writer = + AsyncArrowWriter::try_new(writable_stream.compat(), schema, options).unwrap(); + while let Some(batch) = pinned_stream.next().await { + let _ = writer.write(&batch.into()).await; + } + let _ = writer.close().await; + }); + Ok(output_stream) +} \ No newline at end of file From 6927ffe89b62789cc3362d7125bb1a25fd56870a Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Tue, 18 Jun 2024 23:16:14 +1000 Subject: [PATCH 2/7] streaming write tests --- tests/js/read-write.test.ts | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/js/read-write.test.ts b/tests/js/read-write.test.ts index f8a8025b..da76d9f3 100644 --- a/tests/js/read-write.test.ts +++ b/tests/js/read-write.test.ts @@ -1,7 +1,7 @@ import * as wasm from "../../pkg/node/parquet_wasm"; import { readFileSync } from "fs"; import { tableFromIPC, tableToIPC } from "apache-arrow"; -import { testArrowTablesEqual, readExpectedArrowData } from "./utils"; +import { testArrowTablesEqual, readExpectedArrowData, temporaryServer } from "./utils"; import { describe, it, expect } from "vitest"; // Path from repo root @@ -89,3 +89,21 @@ it("reads empty file", async (t) => { expect(table.numCols).toStrictEqual(0); // console.log("empty table schema", table.schema); }); + +it("read stream-write stream-read stream round trip (no writer properties provided)", async (t) => { + const server = await temporaryServer(); + const listeningPort = server.addresses()[0].port; + const rootUrl = `http://localhost:${listeningPort}`; + + const expectedTable = readExpectedArrowData(); + + const url = `${rootUrl}/1-partition-brotli.parquet`; + const originalStream = await wasm.readParquetStream(url); + + const stream = await wasm.transformParquetStream(originalStream); + const accumulatedBuffer = new Uint8Array(await new Response(stream).arrayBuffer()); + const roundtripTable = tableFromIPC(wasm.readParquet(accumulatedBuffer).intoIPCStream()); + + testArrowTablesEqual(expectedTable, roundtripTable); + await server.close(); +}) \ No newline at end of file From dd672fe3ae2d82aab3c8ac2a04ecfb91d86ca0f7 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Tue, 18 Jun 2024 23:30:27 +1000 Subject: [PATCH 3/7] cargo fmt, address clippy recommendations --- src/common/mod.rs | 2 +- src/common/stream.rs | 2 +- src/wasm.rs | 5 ++--- src/writer_async.rs | 4 ++-- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/common/mod.rs b/src/common/mod.rs index 84798af3..69dae7f4 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -4,4 +4,4 @@ pub mod properties; pub mod fetch; #[cfg(feature = "async")] -pub mod stream; \ No newline at end of file +pub mod stream; diff --git a/src/common/stream.rs b/src/common/stream.rs index 3c056f08..74474996 100644 --- a/src/common/stream.rs +++ b/src/common/stream.rs @@ -28,4 +28,4 @@ impl<'writer> AsyncWrite for WrappedWritableStream<'writer> { } } -unsafe impl<'writer> Send for WrappedWritableStream<'writer> {} \ No newline at end of file +unsafe impl<'writer> Send for WrappedWritableStream<'writer> {} diff --git a/src/wasm.rs b/src/wasm.rs index 81e8a8c1..88d84768 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -262,12 +262,11 @@ pub fn transform_parquet_stream( .into_stream() .map(|maybe_chunk| { let chunk = maybe_chunk.unwrap(); - let transformed = arrow_wasm::RecordBatch::try_from_js_value(chunk).unwrap(); - transformed + arrow_wasm::RecordBatch::try_from_js_value(chunk).unwrap() }); let output_stream = super::writer_async::transform_parquet_stream( batches, writer_properties.unwrap_or_default(), ); Ok(output_stream.unwrap()) -} \ No newline at end of file +} diff --git a/src/writer_async.rs b/src/writer_async.rs index 6294cae3..ce1148b1 100644 --- a/src/writer_async.rs +++ b/src/writer_async.rs @@ -1,5 +1,5 @@ -use crate::error::Result; use crate::common::stream::WrappedWritableStream; +use crate::error::Result; use async_compat::CompatExt; use futures::StreamExt; use parquet::arrow::async_writer::AsyncArrowWriter; @@ -35,4 +35,4 @@ pub fn transform_parquet_stream( let _ = writer.close().await; }); Ok(output_stream) -} \ No newline at end of file +} From 87c9e3f74a423b2712ac1fd9c9a1f2b488f6285f Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Fri, 5 Jul 2024 20:14:40 +1000 Subject: [PATCH 4/7] Eliminate majority of unwraps, distinguish better between various error cases in transform_parquet_stream. TODO: eliminate un-handled result returns (writer.write, sender.send, writer.close). --- src/error.rs | 6 +++- src/wasm.rs | 18 ++++++---- src/writer_async.rs | 83 +++++++++++++++++++++++++++++++-------------- 3 files changed, 74 insertions(+), 33 deletions(-) diff --git a/src/error.rs b/src/error.rs index 6be6f4b3..027ba8d2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,7 +1,7 @@ use arrow::error::ArrowError; use parquet::errors::ParquetError; use thiserror::Error; -use wasm_bindgen::JsError; +use wasm_bindgen::{JsError, JsValue}; #[derive(Error, Debug)] pub enum ParquetWasmError { @@ -15,6 +15,10 @@ pub enum ParquetWasmError { #[cfg(feature = "async")] #[error("HTTP error: `{0}`")] HTTPError(Box), + #[error("Platform error: `{0}`")] + PlatformSupportError(String), + #[error("Dyn casting error")] + DynCastingError(JsValue), } pub type Result = std::result::Result; diff --git a/src/wasm.rs b/src/wasm.rs index 88d84768..fed5129c 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -252,21 +252,25 @@ pub async fn read_parquet_stream( #[wasm_bindgen(js_name = "transformParquetStream")] #[cfg(all(feature = "writer", feature = "async"))] -pub fn transform_parquet_stream( +pub async fn transform_parquet_stream( stream: wasm_streams::readable::sys::ReadableStream, writer_properties: Option, ) -> WasmResult { - use futures::StreamExt; + use futures::{StreamExt, TryStreamExt}; use wasm_bindgen::convert::TryFromJsValue; + + use crate::error::ParquetWasmError; let batches = wasm_streams::ReadableStream::from_raw(stream) .into_stream() .map(|maybe_chunk| { - let chunk = maybe_chunk.unwrap(); - arrow_wasm::RecordBatch::try_from_js_value(chunk).unwrap() - }); + let chunk = maybe_chunk?; + arrow_wasm::RecordBatch::try_from_js_value(chunk) + }) + .map_err(|x| ParquetWasmError::DynCastingError(x)); let output_stream = super::writer_async::transform_parquet_stream( batches, writer_properties.unwrap_or_default(), - ); - Ok(output_stream.unwrap()) + ) + .await; + Ok(output_stream?) } diff --git a/src/writer_async.rs b/src/writer_async.rs index ce1148b1..8bceb04d 100644 --- a/src/writer_async.rs +++ b/src/writer_async.rs @@ -1,38 +1,71 @@ use crate::common::stream::WrappedWritableStream; -use crate::error::Result; +use crate::error::{ParquetWasmError, Result}; use async_compat::CompatExt; +use futures::channel::oneshot; use futures::StreamExt; use parquet::arrow::async_writer::AsyncArrowWriter; use wasm_bindgen_futures::spawn_local; -pub fn transform_parquet_stream( - batches: impl futures::Stream + 'static, +pub async fn transform_parquet_stream( + batches: impl futures::Stream> + 'static, writer_properties: crate::writer_properties::WriterProperties, ) -> Result { let options = Some(writer_properties.into()); - // let encoding = writer_properties.get_encoding(); - let (writable_stream, output_stream) = { - let raw_stream = wasm_streams::transform::sys::TransformStream::new().unwrap(); - let raw_writable = raw_stream.writable(); - let inner_writer = wasm_streams::WritableStream::from_raw(raw_writable).into_async_write(); - let writable_stream = WrappedWritableStream { - stream: inner_writer, + let raw_stream = wasm_streams::transform::sys::TransformStream::new(); + if let Ok(raw_stream) = raw_stream { + let (writable_stream, output_stream) = { + let raw_writable = raw_stream.writable(); + let inner_writer = + wasm_streams::WritableStream::from_raw(raw_writable).into_async_write(); + let writable_stream = WrappedWritableStream { + stream: inner_writer, + }; + (writable_stream, raw_stream.readable()) }; - (writable_stream, raw_stream.readable()) - }; - spawn_local::<_>(async move { - let mut adapted_stream = batches.peekable(); - let mut pinned_stream = std::pin::pin!(adapted_stream); - let first_batch = pinned_stream.as_mut().peek().await.unwrap(); - let schema = first_batch.schema().into_inner(); - // Need to create an encoding for each column - let mut writer = - AsyncArrowWriter::try_new(writable_stream.compat(), schema, options).unwrap(); - while let Some(batch) = pinned_stream.next().await { - let _ = writer.write(&batch.into()).await; + // construct a channel for the purposes of signalling errors occuring at the start of the stream. + // Errors that occur during writing will have to fuse the stream. + let (sender, receiver) = oneshot::channel::>(); + spawn_local(async move { + let mut adapted_stream = batches.peekable(); + let mut pinned_stream = std::pin::pin!(adapted_stream); + let first_batch = pinned_stream.as_mut().peek().await; + if let Some(Ok(first_batch)) = first_batch { + let schema = first_batch.schema().into_inner(); + let writer = AsyncArrowWriter::try_new(writable_stream.compat(), schema, options); + match writer { + Ok(mut writer) => { + // unblock the calling thread's receiver (indicating that stream initialization was error-free) + let _ = sender.send(Ok(())); + while let Some(batch) = pinned_stream.next().await { + if let Ok(batch) = batch { + let _ = writer.write(&batch.into()).await; + } + } + let _ = writer.close().await; + } + Err(err) => { + let _ = sender.send(Err(ParquetWasmError::ParquetError(Box::new(err)))); + } + } + // TODO: handle failed to create arrow writer + } else if let Some(Err(err)) = first_batch { + let _ = sender.send(Err(ParquetWasmError::DynCastingError( + err.to_string().into(), + ))); + } else { + let _ = sender.send(Err(ParquetWasmError::DynCastingError( + "null first batch".to_string().into(), + ))); + } + }); + match receiver.await.unwrap() { + Ok(()) => Ok(output_stream), + Err(err) => Err(err), } - let _ = writer.close().await; - }); - Ok(output_stream) + } else { + Err(ParquetWasmError::PlatformSupportError( + "Failed to create TransformStream".to_string(), + )) + } } From b27ddec7df34c5d88021e5778a44d99c080885cc Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Fri, 5 Jul 2024 20:41:46 +1000 Subject: [PATCH 5/7] Obey clippy --- src/wasm.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wasm.rs b/src/wasm.rs index fed5129c..1bce0d04 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -266,7 +266,7 @@ pub async fn transform_parquet_stream( let chunk = maybe_chunk?; arrow_wasm::RecordBatch::try_from_js_value(chunk) }) - .map_err(|x| ParquetWasmError::DynCastingError(x)); + .map_err(ParquetWasmError::DynCastingError); let output_stream = super::writer_async::transform_parquet_stream( batches, writer_properties.unwrap_or_default(), From 7c874521aad2dcb20dc644fc5e9f36b325c57787 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Sat, 27 Jul 2024 16:49:53 +1000 Subject: [PATCH 6/7] Flesh out transformParquetStream docstring, include a few examples focussed on useful streaming flows. --- src/wasm.rs | 67 +++++++++++++++++++++++++++++++++++++++++++++ src/writer_async.rs | 1 - 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/wasm.rs b/src/wasm.rs index 1bce0d04..bde79eff 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -250,6 +250,73 @@ pub async fn read_parquet_stream( Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw()) } + +/// Transform a ReadableStream of RecordBatches to a ReadableStream of bytes +/// +/// Browser example with piping to a file via the File System API: +/// +/// ```js +/// import initWasm, {ParquetFile, transformParquetStream} from "parquet-wasm"; +/// +/// // Instantiate the WebAssembly context +/// await initWasm(); +/// +/// const fileInstance = await ParquetFile.fromUrl("https://example.com/file.parquet"); +/// const recordBatchStream = await fileInstance.stream(); +/// const serializedParquetStream = await transformParquetStream(recordBatchStream); +/// // NB: requires transient user activation - you would typically do this before ☝️ +/// const handle = await window.showSaveFilePicker(); +/// const writable = await handle.createWritable(); +/// await serializedParquetStream.pipeTo(writable); +/// ``` +/// +/// NodeJS (ESM) example with piping to a file: +/// ```js +/// import { open } from "node:fs/promises"; +/// import { Writable } from "node:stream"; +/// import initWasm, {ParquetFile, transformParquetStream} from "parquet-wasm"; +/// +/// // Instantiate the WebAssembly context +/// await initWasm(); +/// +/// const fileInstance = await ParquetFile.fromUrl("https://example.com/file.parquet"); +/// const recordBatchStream = await fileInstance.stream(); +/// const serializedParquetStream = await transformParquetStream(recordBatchStream); +/// +/// // grab a file handle via fsPromises +/// const handle = await open("file.parquet"); +/// const destinationStream = Writable.toWeb(handle.createWriteStream()); +/// await serializedParquetStream.pipeTo(destinationStream); +/// +/// ``` +/// NB: the above is a little contrived - `await writeFile("file.parquet", serializedParquetStream)` +/// is enough for most use cases. +/// +/// Browser kitchen sink example - teeing to the Cache API, using as a streaming post body, transferring +/// to a Web Worker: +/// ```js +/// // prelude elided - see above +/// const serializedParquetStream = await transformParquetStream(recordBatchStream); +/// const [cacheStream, bodyStream] = serializedParquetStream.tee(); +/// const postProm = fetch(targetUrl, { +/// method: "POST", +/// duplex: "half", +/// body: bodyStream +/// }); +/// const targetCache = await caches.open("foobar"); +/// await targetCache.put("https://example.com/file.parquet", new Response(cacheStream)); +/// // this could have been done with another tee, but beware of buffering +/// const workerStream = await targetCache.get("https://example.com/file.parquet").body; +/// const worker = new Worker("worker.js"); +/// worker.postMessage(workerStream, [workerStream]); +/// await postProm; +/// ``` +/// +/// @param stream A {@linkcode ReadableStream} of {@linkcode RecordBatch} instances +/// @param writer_properties (optional) Configuration for writing to Parquet. Use the {@linkcode +/// WriterPropertiesBuilder} to build a writing configuration, then call `.build()` to create an +/// immutable writer properties to pass in here. +/// @returns ReadableStream containing serialized Parquet data. #[wasm_bindgen(js_name = "transformParquetStream")] #[cfg(all(feature = "writer", feature = "async"))] pub async fn transform_parquet_stream( diff --git a/src/writer_async.rs b/src/writer_async.rs index 8bceb04d..b0d04efd 100644 --- a/src/writer_async.rs +++ b/src/writer_async.rs @@ -48,7 +48,6 @@ pub async fn transform_parquet_stream( let _ = sender.send(Err(ParquetWasmError::ParquetError(Box::new(err)))); } } - // TODO: handle failed to create arrow writer } else if let Some(Err(err)) = first_batch { let _ = sender.send(Err(ParquetWasmError::DynCastingError( err.to_string().into(), From 2930f11c1f28dcfe2da97af28abb3e9268516bf2 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Sat, 27 Jul 2024 17:12:40 +1000 Subject: [PATCH 7/7] Obey the linter --- src/wasm.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/wasm.rs b/src/wasm.rs index bde79eff..621e69f6 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -250,7 +250,6 @@ pub async fn read_parquet_stream( Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw()) } - /// Transform a ReadableStream of RecordBatches to a ReadableStream of bytes /// /// Browser example with piping to a file via the File System API: @@ -269,7 +268,7 @@ pub async fn read_parquet_stream( /// const writable = await handle.createWritable(); /// await serializedParquetStream.pipeTo(writable); /// ``` -/// +/// /// NodeJS (ESM) example with piping to a file: /// ```js /// import { open } from "node:fs/promises"; @@ -278,20 +277,20 @@ pub async fn read_parquet_stream( /// /// // Instantiate the WebAssembly context /// await initWasm(); -/// +/// /// const fileInstance = await ParquetFile.fromUrl("https://example.com/file.parquet"); /// const recordBatchStream = await fileInstance.stream(); /// const serializedParquetStream = await transformParquetStream(recordBatchStream); -/// +/// /// // grab a file handle via fsPromises /// const handle = await open("file.parquet"); /// const destinationStream = Writable.toWeb(handle.createWriteStream()); /// await serializedParquetStream.pipeTo(destinationStream); -/// +/// /// ``` /// NB: the above is a little contrived - `await writeFile("file.parquet", serializedParquetStream)` /// is enough for most use cases. -/// +/// /// Browser kitchen sink example - teeing to the Cache API, using as a streaming post body, transferring /// to a Web Worker: /// ```js @@ -311,7 +310,7 @@ pub async fn read_parquet_stream( /// worker.postMessage(workerStream, [workerStream]); /// await postProm; /// ``` -/// +/// /// @param stream A {@linkcode ReadableStream} of {@linkcode RecordBatch} instances /// @param writer_properties (optional) Configuration for writing to Parquet. Use the {@linkcode /// WriterPropertiesBuilder} to build a writing configuration, then call `.build()` to create an