Skip to content

Commit

Permalink
Async iterator compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Dec 24, 2024
1 parent a9dad02 commit abdd4ac
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 84 deletions.
2 changes: 1 addition & 1 deletion rust/geoarrow/src/io/flatgeobuf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ mod reader;
mod writer;

#[cfg(feature = "flatgeobuf_async")]
pub use reader::read_flatgeobuf_async;
pub use reader::{read_flatgeobuf_async, FlatGeobufStreamBuilder, FlatGeobufStreamReader};
pub use reader::{FlatGeobufReader, FlatGeobufReaderBuilder, FlatGeobufReaderOptions};
pub use writer::{write_flatgeobuf, write_flatgeobuf_with_options, FlatGeobufWriterOptions};
195 changes: 155 additions & 40 deletions rust/geoarrow/src/io/flatgeobuf/reader/async.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use async_stream::stream;
use flatgeobuf::{AsyncFeatureIter, FgbFeature, GeometryType, HttpFgbReader};
use futures::future::BoxFuture;
use flatgeobuf::{AsyncFeatureIter, GeometryType, HttpFgbReader};
use futures::task::{Context, Poll};
use futures::Stream;
use geozero::{FeatureProcessor, FeatureProperties};
use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient};
Expand All @@ -16,7 +17,9 @@ use crate::array::metadata::ArrayMetadata;
use crate::array::*;
use crate::datatypes::{Dimension, NativeType};
use crate::error::{GeoArrowError, Result};
use crate::io::flatgeobuf::reader::common::{infer_schema, parse_crs, FlatGeobufReaderOptions};
use crate::io::flatgeobuf::reader::common::{
infer_from_header, infer_schema, parse_crs, FlatGeobufReaderOptions,
};
use crate::io::flatgeobuf::reader::object_store_reader::ObjectStoreWrapper;
use crate::io::geozero::array::GeometryStreamBuilder;
use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions};
Expand All @@ -28,52 +31,62 @@ pub struct FlatGeobufStreamBuilder<T: AsyncHttpRangeClient> {
}

impl<T: AsyncHttpRangeClient> FlatGeobufStreamBuilder<T> {
// pub async fn open(reader: T) -> Result<Self> {
// HttpFgbReader::new(client)
// todo!()
// }

pub async fn open_existing(reader: AsyncBufferedHttpRangeClient<T>) -> Result<Self> {
/// Create a new [FlatGeobufStreamBuilder] from an [AsyncBufferedHttpRangeClient]
pub async fn new(reader: AsyncBufferedHttpRangeClient<T>) -> Result<Self> {
let reader = HttpFgbReader::new(reader).await.unwrap();
Ok(Self { reader })
}

pub async fn read(self) {
let features = self.reader.select_all().await.unwrap();
/// Create a new [FlatGeobufStreamBuilder] directly from a client.
pub async fn open_with(reader: T, url: &str) -> Result<Self> {
let client = AsyncBufferedHttpRangeClient::with(reader, url);
Self::new(client).await
}

/// Read from the FlatGeobuf file
pub async fn read(self, options: FlatGeobufReaderOptions) -> Result<FlatGeobufStreamReader<T>> {
let (data_type, properties_schema, array_metadata) =
infer_from_header(self.reader.header())?;
if let Some((min_x, min_y, max_x, max_y)) = options.bbox {
let selection = self.reader.select_bbox(min_x, min_y, max_x, max_y).await?;
let num_rows = selection.features_count();
Ok(FlatGeobufStreamReader {
selection,
data_type,
batch_size: options.batch_size.unwrap_or(65_536),
properties_schema,
num_rows_remaining: num_rows,
array_metadata,
})
} else {
let selection = self.reader.select_all().await?;
let num_rows = selection.features_count();
Ok(FlatGeobufStreamReader {
selection,
data_type,
batch_size: options.batch_size.unwrap_or(65_536),
properties_schema,
num_rows_remaining: num_rows,
array_metadata,
})
}
}
}

impl FlatGeobufStreamBuilder<ObjectStoreWrapper> {
pub async fn open_store(
reader: Arc<dyn ObjectStore>,
location: Path,
options: FlatGeobufReaderOptions,
) -> Result<Self> {
let head = reader.head(&location).await?;

/// Create a [FlatGeobufStreamBuilder] from an [ObjectStore] instance.
pub async fn open_store(store: Arc<dyn ObjectStore>, location: Path) -> Result<Self> {
let head = store.head(&location).await?;
let object_store_wrapper = ObjectStoreWrapper {
reader,
reader: store,
location,
size: head.size,
};

let async_client = AsyncBufferedHttpRangeClient::with(object_store_wrapper, "");

Self::open_existing(async_client).await
Self::new(async_client).await
}
}

enum StreamState {
/// At the start of a new row group, or the end of the FlatGeobuf stream
Init,
/// Decoding a batch
Decoding,
/// Reading data from input
Reading(BoxFuture<'static, ()>),
/// Error
Error,
}

/// An iterator over record batches from a FlatGeobuf file.
///
/// This implements [arrow_array::RecordBatchReader], which you can use to access data.
Expand All @@ -84,11 +97,11 @@ pub struct FlatGeobufStreamReader<T: AsyncHttpRangeClient> {
properties_schema: SchemaRef,
num_rows_remaining: Option<usize>,
array_metadata: Arc<ArrayMetadata>,
state: StreamState,
}

impl<T: AsyncHttpRangeClient> FlatGeobufStreamReader<T> {
fn schema(&self) -> SchemaRef {
/// Access the schema of the batches emitted from this stream.
pub fn schema(&self) -> SchemaRef {
let geom_field =
self.data_type
.to_field_with_metadata("geometry", true, &self.array_metadata);
Expand All @@ -100,11 +113,113 @@ impl<T: AsyncHttpRangeClient> FlatGeobufStreamReader<T> {
))
}

fn into_stream(self) -> impl Stream<Item = Result<Option<FgbFeature>>> {
stream! {
for await value in self.selection {
yield value
fn construct_options(&self) -> GeoTableBuilderOptions {
let coord_type = self.data_type.coord_type();
let mut batch_size = self.batch_size;
if let Some(num_rows_remaining) = self.num_rows_remaining {
batch_size = batch_size.min(num_rows_remaining);
}
GeoTableBuilderOptions::new(
coord_type,
false,
Some(batch_size),
Some(self.properties_schema.clone()),
self.num_rows_remaining,
self.array_metadata.clone(),
)
}

async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
let options = self.construct_options();
let batch_size = options.batch_size;

macro_rules! impl_read {
($builder:expr) => {{
let mut row_count = 0;
loop {
if row_count >= batch_size {
let (batches, _schema) = $builder.finish()?.into_inner();
assert_eq!(batches.len(), 1);
return Ok(Some(batches.into_iter().next().unwrap()));
}

if let Some(feature) = self.selection.next().await? {
feature.process_properties(&mut $builder)?;
$builder.properties_end()?;

$builder.push_geometry(feature.geometry_trait()?.as_ref())?;

$builder.feature_end(0)?;
row_count += 1;
} else {
return Ok(None);
}
}
}};
}

match self.data_type {
NativeType::Point(_, dim) => {
let mut builder = GeoTableBuilder::<PointBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::LineString(_, dim) => {
let mut builder =
GeoTableBuilder::<LineStringBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::Polygon(_, dim) => {
let mut builder = GeoTableBuilder::<PolygonBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiPoint(_, dim) => {
let mut builder =
GeoTableBuilder::<MultiPointBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiLineString(_, dim) => {
let mut builder =
GeoTableBuilder::<MultiLineStringBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiPolygon(_, dim) => {
let mut builder =
GeoTableBuilder::<MultiPolygonBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::Geometry(_) | NativeType::GeometryCollection(_, _) => {
let mut builder = GeoTableBuilder::<GeometryStreamBuilder>::new_with_options(
// TODO: I think this is unused? remove.
Dimension::XY,
options,
);
impl_read!(builder)
}
geom_type => Err(GeoArrowError::NotYetImplemented(format!(
"Parsing FlatGeobuf from {:?} geometry type not yet supported",
geom_type
))),
}
}
}

impl<T> Stream for FlatGeobufStreamReader<T>
where
T: AsyncHttpRangeClient + Unpin + Send,
{
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let future = self.next_batch();
futures::pin_mut!(future);

match future.poll(cx) {
Poll::Ready(Ok(Some(feature))) => Poll::Ready(Some(Ok(feature))),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
// End of stream
Poll::Ready(Ok(None)) => Poll::Ready(None),
// Still waiting
Poll::Pending => Poll::Pending,
}
}
}
Expand Down
40 changes: 39 additions & 1 deletion rust/geoarrow/src/io/flatgeobuf/reader/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::{DataType, Field, SchemaBuilder, SchemaRef, TimeUnit};
use flatgeobuf::{ColumnType, Crs, Header};
use flatgeobuf::{ColumnType, Crs, GeometryType, Header};
use serde_json::Value;

use crate::array::metadata::{ArrayMetadata, CRSType};
use crate::array::CoordType;
use crate::datatypes::{Dimension, NativeType};
use crate::error::{GeoArrowError, Result};

/// Options for the FlatGeobuf reader
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -101,3 +103,39 @@ pub(super) fn parse_crs(crs: Option<Crs<'_>>) -> Arc<ArrayMetadata> {

Default::default()
}

pub(super) fn infer_from_header(
header: Header<'_>,
) -> Result<(NativeType, SchemaRef, Arc<ArrayMetadata>)> {
use Dimension::*;

if header.has_m() | header.has_t() | header.has_tm() {
return Err(GeoArrowError::General(
"Only XY and XYZ dimensions are supported".to_string(),
));
}
let has_z = header.has_z();

let properties_schema = infer_schema(header);
let geometry_type = header.geometry_type();
let array_metadata = parse_crs(header.crs());
// TODO: pass through arg
let coord_type = CoordType::Interleaved;
let data_type = match (geometry_type, has_z) {
(GeometryType::Point, false) => NativeType::Point(coord_type, XY),
(GeometryType::LineString, false) => NativeType::LineString(coord_type, XY),
(GeometryType::Polygon, false) => NativeType::Polygon(coord_type, XY),
(GeometryType::MultiPoint, false) => NativeType::MultiPoint(coord_type, XY),
(GeometryType::MultiLineString, false) => NativeType::MultiLineString(coord_type, XY),
(GeometryType::MultiPolygon, false) => NativeType::MultiPolygon(coord_type, XY),
(GeometryType::Point, true) => NativeType::Point(coord_type, XYZ),
(GeometryType::LineString, true) => NativeType::LineString(coord_type, XYZ),
(GeometryType::Polygon, true) => NativeType::Polygon(coord_type, XYZ),
(GeometryType::MultiPoint, true) => NativeType::MultiPoint(coord_type, XYZ),
(GeometryType::MultiLineString, true) => NativeType::MultiLineString(coord_type, XYZ),
(GeometryType::MultiPolygon, true) => NativeType::MultiPolygon(coord_type, XYZ),
(GeometryType::Unknown, _) => NativeType::Geometry(coord_type),
_ => panic!("Unsupported type"),
};
Ok((data_type, properties_schema, array_metadata))
}
2 changes: 1 addition & 1 deletion rust/geoarrow/src/io/flatgeobuf/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod sync;

pub use common::FlatGeobufReaderOptions;
#[cfg(feature = "flatgeobuf_async")]
pub use r#async::read_flatgeobuf_async;
pub use r#async::{read_flatgeobuf_async, FlatGeobufStreamBuilder, FlatGeobufStreamReader};
pub use sync::{FlatGeobufReader, FlatGeobufReaderBuilder};
47 changes: 6 additions & 41 deletions rust/geoarrow/src/io/flatgeobuf/reader/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ use crate::array::metadata::ArrayMetadata;
use crate::array::*;
use crate::datatypes::{Dimension, NativeType};
use crate::error::{GeoArrowError, Result};
use crate::io::flatgeobuf::reader::common::{infer_schema, parse_crs, FlatGeobufReaderOptions};
use crate::io::flatgeobuf::reader::common::{infer_from_header, FlatGeobufReaderOptions};
use crate::io::geozero::array::GeometryStreamBuilder;
use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions};
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use flatgeobuf::{
FallibleStreamingIterator, FeatureIter, FgbReader, GeometryType, NotSeekable, Seekable,
};
use flatgeobuf::{FallibleStreamingIterator, FeatureIter, FgbReader, NotSeekable, Seekable};
use geozero::{FeatureProcessor, FeatureProperties};
use std::io::{Read, Seek};
use std::sync::Arc;
Expand All @@ -47,47 +45,13 @@ impl<R: Read> FlatGeobufReaderBuilder<R> {
Ok(Self { reader })
}

fn infer_from_header(&self) -> Result<(NativeType, SchemaRef, Arc<ArrayMetadata>)> {
use Dimension::*;

let header = self.reader.header();
if header.has_m() | header.has_t() | header.has_tm() {
return Err(GeoArrowError::General(
"Only XY and XYZ dimensions are supported".to_string(),
));
}
let has_z = header.has_z();

let properties_schema = infer_schema(header);
let geometry_type = header.geometry_type();
let array_metadata = parse_crs(header.crs());
// TODO: pass through arg
let coord_type = CoordType::Interleaved;
let data_type = match (geometry_type, has_z) {
(GeometryType::Point, false) => NativeType::Point(coord_type, XY),
(GeometryType::LineString, false) => NativeType::LineString(coord_type, XY),
(GeometryType::Polygon, false) => NativeType::Polygon(coord_type, XY),
(GeometryType::MultiPoint, false) => NativeType::MultiPoint(coord_type, XY),
(GeometryType::MultiLineString, false) => NativeType::MultiLineString(coord_type, XY),
(GeometryType::MultiPolygon, false) => NativeType::MultiPolygon(coord_type, XY),
(GeometryType::Point, true) => NativeType::Point(coord_type, XYZ),
(GeometryType::LineString, true) => NativeType::LineString(coord_type, XYZ),
(GeometryType::Polygon, true) => NativeType::Polygon(coord_type, XYZ),
(GeometryType::MultiPoint, true) => NativeType::MultiPoint(coord_type, XYZ),
(GeometryType::MultiLineString, true) => NativeType::MultiLineString(coord_type, XYZ),
(GeometryType::MultiPolygon, true) => NativeType::MultiPolygon(coord_type, XYZ),
(GeometryType::Unknown, _) => NativeType::Geometry(coord_type),
_ => panic!("Unsupported type"),
};
Ok((data_type, properties_schema, array_metadata))
}

/// Read features sequentially, without using `Seek`
pub fn read_seq(
self,
options: FlatGeobufReaderOptions,
) -> Result<FlatGeobufReader<R, NotSeekable>> {
let (data_type, properties_schema, array_metadata) = self.infer_from_header()?;
let (data_type, properties_schema, array_metadata) =
infer_from_header(self.reader.header())?;
if let Some((min_x, min_y, max_x, max_y)) = options.bbox {
let selection = self.reader.select_bbox_seq(min_x, min_y, max_x, max_y)?;
let num_rows = selection.features_count();
Expand Down Expand Up @@ -117,7 +81,8 @@ impl<R: Read> FlatGeobufReaderBuilder<R> {
impl<R: Read + Seek> FlatGeobufReaderBuilder<R> {
/// Read features
pub fn read(self, options: FlatGeobufReaderOptions) -> Result<FlatGeobufReader<R, Seekable>> {
let (data_type, properties_schema, array_metadata) = self.infer_from_header()?;
let (data_type, properties_schema, array_metadata) =
infer_from_header(self.reader.header())?;
if let Some((min_x, min_y, max_x, max_y)) = options.bbox {
let selection = self.reader.select_bbox(min_x, min_y, max_x, max_y)?;
let num_rows = selection.features_count();
Expand Down

0 comments on commit abdd4ac

Please sign in to comment.