Skip to content

Commit

Permalink
move RecordBatchReader out of geozero code
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed May 15, 2024
1 parent c164f6b commit 5a4f730
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/io/csv/writer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::Result;
use crate::io::geozero::RecordBatchReader;
use crate::io::stream::RecordBatchReader;
use geozero::csv::CsvWriter;
use geozero::GeozeroDatasource;
use std::io::Write;
Expand Down
2 changes: 1 addition & 1 deletion src/io/flatgeobuf/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use flatgeobuf::{FgbWriter, FgbWriterOptions};
use geozero::GeozeroDatasource;

use crate::error::Result;
use crate::io::geozero::RecordBatchReader;
use crate::io::stream::RecordBatchReader;
use crate::schema::GeoSchemaExt;

// TODO: always write CRS saved in Table metadata (you can do this by adding an option)
Expand Down
2 changes: 1 addition & 1 deletion src/io/geojson/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::geojson_writer::GeoJsonWriter;
use crate::error::Result;
use crate::io::geozero::table::RecordBatchReader;
use crate::io::stream::RecordBatchReader;
use geozero::GeozeroDatasource;
use std::io::Write;

Expand Down
2 changes: 1 addition & 1 deletion src/io/geojson_lines/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use geozero::GeozeroDatasource;
use std::io::Write;

use crate::error::Result;
use crate::io::geozero::RecordBatchReader;
use crate::io::stream::RecordBatchReader;

/// Write a table to newline-delimited GeoJSON
pub fn write_geojson_lines<W: Write>(table: &mut RecordBatchReader, writer: W) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion src/io/geozero/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ pub use array::{
ToMultiPolygonArray, ToPointArray, ToPolygonArray,
};
pub use scalar::ToGeometry;
pub use table::RecordBatchReader;
52 changes: 3 additions & 49 deletions src/io/geozero/table/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,23 @@
use std::str::FromStr;

use crate::array::geometry::GeometryArray;
use crate::error::GeoArrowError;
use crate::io::geozero::scalar::process_geometry;
use crate::io::geozero::table::json_encoder::{make_encoder, EncoderOptions};
use crate::io::stream::RecordBatchReader;
use crate::schema::GeoSchemaExt;
use crate::table::Table;
use crate::trait_::GeometryArrayAccessor;
use arrow::array::AsArray;
use arrow::datatypes::*;
use arrow_array::timezone::Tz;
use arrow_array::{
Array, RecordBatch, RecordBatchIterator, RecordBatchReader as _RecordBatchReader,
};
use arrow_array::{Array, RecordBatch};
use arrow_schema::{DataType, Schema};
use geozero::error::GeozeroError;
use geozero::{ColumnValue, FeatureProcessor, GeomProcessor, GeozeroDatasource, PropertyProcessor};

/// A wrapper around an [arrow_array::RecordBatchReader] so that we can impl the GeozeroDatasource
/// trait.
pub struct RecordBatchReader(Option<Box<dyn _RecordBatchReader>>);

impl RecordBatchReader {
pub fn new(reader: Box<dyn _RecordBatchReader>) -> Self {
Self(Some(reader))
}

pub fn schema(&self) -> Result<SchemaRef, GeoArrowError> {
let reader = self
.0
.as_ref()
.ok_or(GeoArrowError::General("Closed stream".to_string()))?;
Ok(reader.schema())
}

pub fn take(&mut self) -> Option<Box<dyn _RecordBatchReader>> {
self.0.take()
}
}

impl From<Table> for RecordBatchReader {
fn from(value: Table) -> Self {
let (schema, batches) = value.into_inner();
Self(Some(Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema,
))))
}
}

impl From<Box<dyn _RecordBatchReader>> for RecordBatchReader {
fn from(value: Box<dyn _RecordBatchReader>) -> Self {
Self(Some(value))
}
}

impl From<Box<dyn _RecordBatchReader + Send>> for RecordBatchReader {
fn from(value: Box<dyn _RecordBatchReader + Send>) -> Self {
Self(Some(value))
}
}

impl GeozeroDatasource for RecordBatchReader {
fn process<P: FeatureProcessor>(&mut self, processor: &mut P) -> Result<(), GeozeroError> {
let reader = self.0.take().ok_or(GeozeroError::Dataset(
let reader = self.take().ok_or(GeozeroError::Dataset(
"Cannot read from closed RecordBatchReader".to_string(),
))?;
let schema = reader.schema();
Expand Down
1 change: 0 additions & 1 deletion src/io/geozero/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ mod data_source;
mod json_encoder;

pub use builder::{GeoTableBuilder, GeoTableBuilderOptions};
pub use data_source::RecordBatchReader;
2 changes: 1 addition & 1 deletion src/io/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io::Write;
use arrow_ipc::writer::{FileWriter, StreamWriter};

use crate::error::{GeoArrowError, Result};
use crate::io::geozero::RecordBatchReader;
use crate::io::stream::RecordBatchReader;

/// Write a Table to an Arrow IPC (Feather v2) file
pub fn write_ipc<W: Write>(table: &mut RecordBatchReader, writer: W) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ pub mod ipc;
pub mod parquet;
#[cfg(feature = "postgis")]
pub mod postgis;
pub mod stream;
pub mod wkb;
48 changes: 48 additions & 0 deletions src/io/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::error::GeoArrowError;
use crate::table::Table;
use arrow_array::{RecordBatchIterator, RecordBatchReader as _RecordBatchReader};
use arrow_schema::SchemaRef;

/// A wrapper around an [arrow_array::RecordBatchReader] so that we can impl the GeozeroDatasource
/// trait.
pub struct RecordBatchReader(Option<Box<dyn _RecordBatchReader>>);

impl RecordBatchReader {
pub fn new(reader: Box<dyn _RecordBatchReader>) -> Self {
Self(Some(reader))
}

pub fn schema(&self) -> Result<SchemaRef, GeoArrowError> {
let reader = self
.0
.as_ref()
.ok_or(GeoArrowError::General("Closed stream".to_string()))?;
Ok(reader.schema())
}

pub fn take(&mut self) -> Option<Box<dyn _RecordBatchReader>> {
self.0.take()
}
}

impl From<Table> for RecordBatchReader {
fn from(value: Table) -> Self {
let (schema, batches) = value.into_inner();
Self(Some(Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema,
))))
}
}

impl From<Box<dyn _RecordBatchReader>> for RecordBatchReader {
fn from(value: Box<dyn _RecordBatchReader>) -> Self {
Self(Some(value))
}
}

impl From<Box<dyn _RecordBatchReader + Send>> for RecordBatchReader {
fn from(value: Box<dyn _RecordBatchReader + Send>) -> Self {
Self(Some(value))
}
}

0 comments on commit 5a4f730

Please sign in to comment.