From abdd4ac9ee507609e2cad95f521caeb52c7e790a Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 23 Dec 2024 23:27:35 -0500 Subject: [PATCH] Async iterator compiles --- rust/geoarrow/src/io/flatgeobuf/mod.rs | 2 +- .../src/io/flatgeobuf/reader/async.rs | 195 ++++++++++++++---- .../src/io/flatgeobuf/reader/common.rs | 40 +++- rust/geoarrow/src/io/flatgeobuf/reader/mod.rs | 2 +- .../geoarrow/src/io/flatgeobuf/reader/sync.rs | 47 +---- 5 files changed, 202 insertions(+), 84 deletions(-) diff --git a/rust/geoarrow/src/io/flatgeobuf/mod.rs b/rust/geoarrow/src/io/flatgeobuf/mod.rs index 85acb33d..06881193 100644 --- a/rust/geoarrow/src/io/flatgeobuf/mod.rs +++ b/rust/geoarrow/src/io/flatgeobuf/mod.rs @@ -4,6 +4,6 @@ mod reader; mod writer; #[cfg(feature = "flatgeobuf_async")] -pub use reader::read_flatgeobuf_async; +pub use reader::{read_flatgeobuf_async, FlatGeobufStreamBuilder, FlatGeobufStreamReader}; pub use reader::{FlatGeobufReader, FlatGeobufReaderBuilder, FlatGeobufReaderOptions}; pub use writer::{write_flatgeobuf, write_flatgeobuf_with_options, FlatGeobufWriterOptions}; diff --git a/rust/geoarrow/src/io/flatgeobuf/reader/async.rs b/rust/geoarrow/src/io/flatgeobuf/reader/async.rs index 5bf7de40..0598bd2e 100644 --- a/rust/geoarrow/src/io/flatgeobuf/reader/async.rs +++ b/rust/geoarrow/src/io/flatgeobuf/reader/async.rs @@ -1,10 +1,11 @@ +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef}; -use async_stream::stream; -use flatgeobuf::{AsyncFeatureIter, FgbFeature, GeometryType, HttpFgbReader}; -use futures::future::BoxFuture; +use flatgeobuf::{AsyncFeatureIter, GeometryType, HttpFgbReader}; +use futures::task::{Context, Poll}; use futures::Stream; use geozero::{FeatureProcessor, FeatureProperties}; use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient}; @@ -16,7 +17,9 @@ use crate::array::metadata::ArrayMetadata; use crate::array::*; use crate::datatypes::{Dimension, NativeType}; use crate::error::{GeoArrowError, Result}; -use crate::io::flatgeobuf::reader::common::{infer_schema, parse_crs, FlatGeobufReaderOptions}; +use crate::io::flatgeobuf::reader::common::{ + infer_from_header, infer_schema, parse_crs, FlatGeobufReaderOptions, +}; use crate::io::flatgeobuf::reader::object_store_reader::ObjectStoreWrapper; use crate::io::geozero::array::GeometryStreamBuilder; use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions}; @@ -28,52 +31,62 @@ pub struct FlatGeobufStreamBuilder { } impl FlatGeobufStreamBuilder { - // pub async fn open(reader: T) -> Result { - // HttpFgbReader::new(client) - // todo!() - // } - - pub async fn open_existing(reader: AsyncBufferedHttpRangeClient) -> Result { + /// Create a new [FlatGeobufStreamBuilder] from an [AsyncBufferedHttpRangeClient] + pub async fn new(reader: AsyncBufferedHttpRangeClient) -> Result { let reader = HttpFgbReader::new(reader).await.unwrap(); Ok(Self { reader }) } - pub async fn read(self) { - let features = self.reader.select_all().await.unwrap(); + /// Create a new [FlatGeobufStreamBuilder] directly from a client. + pub async fn open_with(reader: T, url: &str) -> Result { + let client = AsyncBufferedHttpRangeClient::with(reader, url); + Self::new(client).await + } + + /// Read from the FlatGeobuf file + pub async fn read(self, options: FlatGeobufReaderOptions) -> Result> { + let (data_type, properties_schema, array_metadata) = + infer_from_header(self.reader.header())?; + if let Some((min_x, min_y, max_x, max_y)) = options.bbox { + let selection = self.reader.select_bbox(min_x, min_y, max_x, max_y).await?; + let num_rows = selection.features_count(); + Ok(FlatGeobufStreamReader { + selection, + data_type, + batch_size: options.batch_size.unwrap_or(65_536), + properties_schema, + num_rows_remaining: num_rows, + array_metadata, + }) + } else { + let selection = self.reader.select_all().await?; + let num_rows = selection.features_count(); + Ok(FlatGeobufStreamReader { + selection, + data_type, + batch_size: options.batch_size.unwrap_or(65_536), + properties_schema, + num_rows_remaining: num_rows, + array_metadata, + }) + } } } impl FlatGeobufStreamBuilder { - pub async fn open_store( - reader: Arc, - location: Path, - options: FlatGeobufReaderOptions, - ) -> Result { - let head = reader.head(&location).await?; - + /// Create a [FlatGeobufStreamBuilder] from an [ObjectStore] instance. + pub async fn open_store(store: Arc, location: Path) -> Result { + let head = store.head(&location).await?; let object_store_wrapper = ObjectStoreWrapper { - reader, + reader: store, location, size: head.size, }; - let async_client = AsyncBufferedHttpRangeClient::with(object_store_wrapper, ""); - - Self::open_existing(async_client).await + Self::new(async_client).await } } -enum StreamState { - /// At the start of a new row group, or the end of the FlatGeobuf stream - Init, - /// Decoding a batch - Decoding, - /// Reading data from input - Reading(BoxFuture<'static, ()>), - /// Error - Error, -} - /// An iterator over record batches from a FlatGeobuf file. /// /// This implements [arrow_array::RecordBatchReader], which you can use to access data. @@ -84,11 +97,11 @@ pub struct FlatGeobufStreamReader { properties_schema: SchemaRef, num_rows_remaining: Option, array_metadata: Arc, - state: StreamState, } impl FlatGeobufStreamReader { - fn schema(&self) -> SchemaRef { + /// Access the schema of the batches emitted from this stream. + pub fn schema(&self) -> SchemaRef { let geom_field = self.data_type .to_field_with_metadata("geometry", true, &self.array_metadata); @@ -100,11 +113,113 @@ impl FlatGeobufStreamReader { )) } - fn into_stream(self) -> impl Stream>> { - stream! { - for await value in self.selection { - yield value + fn construct_options(&self) -> GeoTableBuilderOptions { + let coord_type = self.data_type.coord_type(); + let mut batch_size = self.batch_size; + if let Some(num_rows_remaining) = self.num_rows_remaining { + batch_size = batch_size.min(num_rows_remaining); + } + GeoTableBuilderOptions::new( + coord_type, + false, + Some(batch_size), + Some(self.properties_schema.clone()), + self.num_rows_remaining, + self.array_metadata.clone(), + ) + } + + async fn next_batch(&mut self) -> Result> { + let options = self.construct_options(); + let batch_size = options.batch_size; + + macro_rules! impl_read { + ($builder:expr) => {{ + let mut row_count = 0; + loop { + if row_count >= batch_size { + let (batches, _schema) = $builder.finish()?.into_inner(); + assert_eq!(batches.len(), 1); + return Ok(Some(batches.into_iter().next().unwrap())); + } + + if let Some(feature) = self.selection.next().await? { + feature.process_properties(&mut $builder)?; + $builder.properties_end()?; + + $builder.push_geometry(feature.geometry_trait()?.as_ref())?; + + $builder.feature_end(0)?; + row_count += 1; + } else { + return Ok(None); + } + } + }}; + } + + match self.data_type { + NativeType::Point(_, dim) => { + let mut builder = GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::LineString(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::Polygon(_, dim) => { + let mut builder = GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::MultiPoint(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::MultiLineString(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) } + NativeType::MultiPolygon(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::Geometry(_) | NativeType::GeometryCollection(_, _) => { + let mut builder = GeoTableBuilder::::new_with_options( + // TODO: I think this is unused? remove. + Dimension::XY, + options, + ); + impl_read!(builder) + } + geom_type => Err(GeoArrowError::NotYetImplemented(format!( + "Parsing FlatGeobuf from {:?} geometry type not yet supported", + geom_type + ))), + } + } +} + +impl Stream for FlatGeobufStreamReader +where + T: AsyncHttpRangeClient + Unpin + Send, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let future = self.next_batch(); + futures::pin_mut!(future); + + match future.poll(cx) { + Poll::Ready(Ok(Some(feature))) => Poll::Ready(Some(Ok(feature))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + // End of stream + Poll::Ready(Ok(None)) => Poll::Ready(None), + // Still waiting + Poll::Pending => Poll::Pending, } } } diff --git a/rust/geoarrow/src/io/flatgeobuf/reader/common.rs b/rust/geoarrow/src/io/flatgeobuf/reader/common.rs index 3b02f6d2..346c80f2 100644 --- a/rust/geoarrow/src/io/flatgeobuf/reader/common.rs +++ b/rust/geoarrow/src/io/flatgeobuf/reader/common.rs @@ -2,11 +2,13 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_schema::{DataType, Field, SchemaBuilder, SchemaRef, TimeUnit}; -use flatgeobuf::{ColumnType, Crs, Header}; +use flatgeobuf::{ColumnType, Crs, GeometryType, Header}; use serde_json::Value; use crate::array::metadata::{ArrayMetadata, CRSType}; use crate::array::CoordType; +use crate::datatypes::{Dimension, NativeType}; +use crate::error::{GeoArrowError, Result}; /// Options for the FlatGeobuf reader #[derive(Debug, Clone)] @@ -101,3 +103,39 @@ pub(super) fn parse_crs(crs: Option>) -> Arc { Default::default() } + +pub(super) fn infer_from_header( + header: Header<'_>, +) -> Result<(NativeType, SchemaRef, Arc)> { + use Dimension::*; + + if header.has_m() | header.has_t() | header.has_tm() { + return Err(GeoArrowError::General( + "Only XY and XYZ dimensions are supported".to_string(), + )); + } + let has_z = header.has_z(); + + let properties_schema = infer_schema(header); + let geometry_type = header.geometry_type(); + let array_metadata = parse_crs(header.crs()); + // TODO: pass through arg + let coord_type = CoordType::Interleaved; + let data_type = match (geometry_type, has_z) { + (GeometryType::Point, false) => NativeType::Point(coord_type, XY), + (GeometryType::LineString, false) => NativeType::LineString(coord_type, XY), + (GeometryType::Polygon, false) => NativeType::Polygon(coord_type, XY), + (GeometryType::MultiPoint, false) => NativeType::MultiPoint(coord_type, XY), + (GeometryType::MultiLineString, false) => NativeType::MultiLineString(coord_type, XY), + (GeometryType::MultiPolygon, false) => NativeType::MultiPolygon(coord_type, XY), + (GeometryType::Point, true) => NativeType::Point(coord_type, XYZ), + (GeometryType::LineString, true) => NativeType::LineString(coord_type, XYZ), + (GeometryType::Polygon, true) => NativeType::Polygon(coord_type, XYZ), + (GeometryType::MultiPoint, true) => NativeType::MultiPoint(coord_type, XYZ), + (GeometryType::MultiLineString, true) => NativeType::MultiLineString(coord_type, XYZ), + (GeometryType::MultiPolygon, true) => NativeType::MultiPolygon(coord_type, XYZ), + (GeometryType::Unknown, _) => NativeType::Geometry(coord_type), + _ => panic!("Unsupported type"), + }; + Ok((data_type, properties_schema, array_metadata)) +} diff --git a/rust/geoarrow/src/io/flatgeobuf/reader/mod.rs b/rust/geoarrow/src/io/flatgeobuf/reader/mod.rs index 250aebad..5f4a6e66 100644 --- a/rust/geoarrow/src/io/flatgeobuf/reader/mod.rs +++ b/rust/geoarrow/src/io/flatgeobuf/reader/mod.rs @@ -7,5 +7,5 @@ mod sync; pub use common::FlatGeobufReaderOptions; #[cfg(feature = "flatgeobuf_async")] -pub use r#async::read_flatgeobuf_async; +pub use r#async::{read_flatgeobuf_async, FlatGeobufStreamBuilder, FlatGeobufStreamReader}; pub use sync::{FlatGeobufReader, FlatGeobufReaderBuilder}; diff --git a/rust/geoarrow/src/io/flatgeobuf/reader/sync.rs b/rust/geoarrow/src/io/flatgeobuf/reader/sync.rs index c84ba6f3..dec2dd71 100644 --- a/rust/geoarrow/src/io/flatgeobuf/reader/sync.rs +++ b/rust/geoarrow/src/io/flatgeobuf/reader/sync.rs @@ -23,14 +23,12 @@ use crate::array::metadata::ArrayMetadata; use crate::array::*; use crate::datatypes::{Dimension, NativeType}; use crate::error::{GeoArrowError, Result}; -use crate::io::flatgeobuf::reader::common::{infer_schema, parse_crs, FlatGeobufReaderOptions}; +use crate::io::flatgeobuf::reader::common::{infer_from_header, FlatGeobufReaderOptions}; use crate::io::geozero::array::GeometryStreamBuilder; use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions}; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, Schema, SchemaRef}; -use flatgeobuf::{ - FallibleStreamingIterator, FeatureIter, FgbReader, GeometryType, NotSeekable, Seekable, -}; +use flatgeobuf::{FallibleStreamingIterator, FeatureIter, FgbReader, NotSeekable, Seekable}; use geozero::{FeatureProcessor, FeatureProperties}; use std::io::{Read, Seek}; use std::sync::Arc; @@ -47,47 +45,13 @@ impl FlatGeobufReaderBuilder { Ok(Self { reader }) } - fn infer_from_header(&self) -> Result<(NativeType, SchemaRef, Arc)> { - use Dimension::*; - - let header = self.reader.header(); - if header.has_m() | header.has_t() | header.has_tm() { - return Err(GeoArrowError::General( - "Only XY and XYZ dimensions are supported".to_string(), - )); - } - let has_z = header.has_z(); - - let properties_schema = infer_schema(header); - let geometry_type = header.geometry_type(); - let array_metadata = parse_crs(header.crs()); - // TODO: pass through arg - let coord_type = CoordType::Interleaved; - let data_type = match (geometry_type, has_z) { - (GeometryType::Point, false) => NativeType::Point(coord_type, XY), - (GeometryType::LineString, false) => NativeType::LineString(coord_type, XY), - (GeometryType::Polygon, false) => NativeType::Polygon(coord_type, XY), - (GeometryType::MultiPoint, false) => NativeType::MultiPoint(coord_type, XY), - (GeometryType::MultiLineString, false) => NativeType::MultiLineString(coord_type, XY), - (GeometryType::MultiPolygon, false) => NativeType::MultiPolygon(coord_type, XY), - (GeometryType::Point, true) => NativeType::Point(coord_type, XYZ), - (GeometryType::LineString, true) => NativeType::LineString(coord_type, XYZ), - (GeometryType::Polygon, true) => NativeType::Polygon(coord_type, XYZ), - (GeometryType::MultiPoint, true) => NativeType::MultiPoint(coord_type, XYZ), - (GeometryType::MultiLineString, true) => NativeType::MultiLineString(coord_type, XYZ), - (GeometryType::MultiPolygon, true) => NativeType::MultiPolygon(coord_type, XYZ), - (GeometryType::Unknown, _) => NativeType::Geometry(coord_type), - _ => panic!("Unsupported type"), - }; - Ok((data_type, properties_schema, array_metadata)) - } - /// Read features sequentially, without using `Seek` pub fn read_seq( self, options: FlatGeobufReaderOptions, ) -> Result> { - let (data_type, properties_schema, array_metadata) = self.infer_from_header()?; + let (data_type, properties_schema, array_metadata) = + infer_from_header(self.reader.header())?; if let Some((min_x, min_y, max_x, max_y)) = options.bbox { let selection = self.reader.select_bbox_seq(min_x, min_y, max_x, max_y)?; let num_rows = selection.features_count(); @@ -117,7 +81,8 @@ impl FlatGeobufReaderBuilder { impl FlatGeobufReaderBuilder { /// Read features pub fn read(self, options: FlatGeobufReaderOptions) -> Result> { - let (data_type, properties_schema, array_metadata) = self.infer_from_header()?; + let (data_type, properties_schema, array_metadata) = + infer_from_header(self.reader.header())?; if let Some((min_x, min_y, max_x, max_y)) = options.bbox { let selection = self.reader.select_bbox(min_x, min_y, max_x, max_y)?; let num_rows = selection.features_count();