Skip to content

Commit

Permalink
use Into<RecordBatchReader> for writer API
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed May 15, 2024
1 parent 5a4f730 commit 57670dc
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 24 deletions.
4 changes: 2 additions & 2 deletions js/src/io/geojson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ pub fn read_geojson(file: &[u8], batch_size: Option<usize>) -> WasmResult<Table>
#[wasm_bindgen(js_name = writeGeoJSON)]
pub fn write_geojson(table: Table) -> WasmResult<Vec<u8>> {
let (schema, batches) = table.into_inner();
let mut rust_table = geoarrow::table::Table::try_new(schema, batches)?;
let rust_table = geoarrow::table::Table::try_new(schema, batches)?;
let mut output_file: Vec<u8> = vec![];
_write_geojson(&mut rust_table, &mut output_file)?;
_write_geojson(rust_table, &mut output_file)?;
Ok(output_file)
}
6 changes: 3 additions & 3 deletions src/io/csv/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use geozero::GeozeroDatasource;
use std::io::Write;

/// Write a Table to CSV
pub fn write_csv<W: Write>(table: &mut RecordBatchReader, writer: W) -> Result<()> {
pub fn write_csv<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) -> Result<()> {
let mut csv_writer = CsvWriter::new(writer);
table.process(&mut csv_writer)?;
stream.into().process(&mut csv_writer)?;
Ok(())
}

Expand All @@ -23,7 +23,7 @@ mod test {

let mut output_buffer = Vec::new();
let writer = BufWriter::new(&mut output_buffer);
write_csv(&mut table.into(), writer).unwrap();
write_csv(&table, writer).unwrap();
let output_string = String::from_utf8(output_buffer).unwrap();
println!("{}", output_string);
}
Expand Down
21 changes: 11 additions & 10 deletions src/io/flatgeobuf/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,33 @@ use crate::schema::GeoSchemaExt;

// TODO: always write CRS saved in Table metadata (you can do this by adding an option)
/// Write a Table to a FlatGeobuf file.
pub fn write_flatgeobuf<W: Write>(
table: &mut RecordBatchReader,
pub fn write_flatgeobuf<W: Write, S: Into<RecordBatchReader>>(
stream: S,
writer: W,
name: &str,
) -> Result<()> {
write_flatgeobuf_with_options(table, writer, name, Default::default())
write_flatgeobuf_with_options(stream, writer, name, Default::default())
}

/// Write a Table to a FlatGeobuf file with specific writer options.
///
/// Note: this `name` argument is what OGR observes as the layer name of the file.
pub fn write_flatgeobuf_with_options<W: Write>(
table: &mut RecordBatchReader,
pub fn write_flatgeobuf_with_options<W: Write, S: Into<RecordBatchReader>>(
stream: S,
writer: W,
name: &str,
options: FgbWriterOptions,
) -> Result<()> {
let mut stream = stream.into();
let mut fgb =
FgbWriter::create_with_options(name, infer_flatgeobuf_geometry_type(table)?, options)?;
table.process(&mut fgb)?;
FgbWriter::create_with_options(name, infer_flatgeobuf_geometry_type(&stream)?, options)?;
stream.process(&mut fgb)?;
fgb.write(writer)?;
Ok(())
}

fn infer_flatgeobuf_geometry_type(table: &RecordBatchReader) -> Result<flatgeobuf::GeometryType> {
let schema = table.schema()?;
fn infer_flatgeobuf_geometry_type(stream: &RecordBatchReader) -> Result<flatgeobuf::GeometryType> {
let schema = stream.schema()?;
let fields = &schema.fields;
let geom_col_idxs = schema.as_ref().geometry_columns();
if geom_col_idxs.len() != 1 {
Expand Down Expand Up @@ -73,7 +74,7 @@ mod test {

let mut output_buffer = Vec::new();
let writer = BufWriter::new(&mut output_buffer);
write_flatgeobuf(&mut table.into(), writer, "name").unwrap();
write_flatgeobuf(&table, writer, "name").unwrap();

let mut reader = Cursor::new(output_buffer);
let new_table = read_flatgeobuf(&mut reader, Default::default()).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/io/geojson/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use std::io::Write;
/// Write a Table to GeoJSON
///
/// Note: Does not reproject to WGS84 for you
pub fn write_geojson<W: Write>(table: &mut RecordBatchReader, writer: W) -> Result<()> {
pub fn write_geojson<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) -> Result<()> {
let mut geojson = GeoJsonWriter::new(writer);
table.process(&mut geojson)?;
stream.into().process(&mut geojson)?;
Ok(())
}

Expand All @@ -25,7 +25,7 @@ mod test {

let mut output_buffer = Vec::new();
let writer = BufWriter::new(&mut output_buffer);
write_geojson(&mut table.into(), writer).unwrap();
write_geojson(&table, writer).unwrap();
let output_string = String::from_utf8(output_buffer).unwrap();
println!("{}", output_string);
}
Expand Down
7 changes: 5 additions & 2 deletions src/io/geojson_lines/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use crate::error::Result;
use crate::io::stream::RecordBatchReader;

/// Write a table to newline-delimited GeoJSON
pub fn write_geojson_lines<W: Write>(table: &mut RecordBatchReader, writer: W) -> Result<()> {
pub fn write_geojson_lines<W: Write, S: Into<RecordBatchReader>>(
stream: S,
writer: W,
) -> Result<()> {
let mut geojson_writer = GeoJsonLineWriter::new(writer);
table.process(&mut geojson_writer)?;
stream.into().process(&mut geojson_writer)?;
Ok(())
}
10 changes: 6 additions & 4 deletions src/io/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::error::{GeoArrowError, Result};
use crate::io::stream::RecordBatchReader;

/// Write a Table to an Arrow IPC (Feather v2) file
pub fn write_ipc<W: Write>(table: &mut RecordBatchReader, writer: W) -> Result<()> {
let inner = table
pub fn write_ipc<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) -> Result<()> {
let inner = stream
.into()
.take()
.ok_or(GeoArrowError::General("Closed stream".to_string()))?;

Expand All @@ -21,8 +22,9 @@ pub fn write_ipc<W: Write>(table: &mut RecordBatchReader, writer: W) -> Result<(
}

/// Write a Table to an Arrow IPC stream
pub fn write_ipc_stream<W: Write>(table: &mut RecordBatchReader, writer: W) -> Result<()> {
let inner = table
pub fn write_ipc_stream<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) -> Result<()> {
let inner = stream
.into()
.take()
.ok_or(GeoArrowError::General("Closed stream".to_string()))?;

Expand Down
6 changes: 6 additions & 0 deletions src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ impl From<Table> for RecordBatchReader {
}
}

impl From<&Table> for RecordBatchReader {
fn from(value: &Table) -> Self {
value.clone().into()
}
}

impl From<Box<dyn _RecordBatchReader>> for RecordBatchReader {
fn from(value: Box<dyn _RecordBatchReader>) -> Self {
Self(Some(value))
Expand Down

0 comments on commit 57670dc

Please sign in to comment.