Skip to content

Commit

Permalink
Restore support from datetime columns in FlatGeobuf reading
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Nov 1, 2023
1 parent ab18f17 commit 11ce205
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ anyhow = "1"
arrow = { version = "48", features = ["ffi"] }
arrow-array = "48"
arrow-buffer = "48"
arrow-cast = "48"
arrow-data = "48"
arrow-ipc = "48"
arrow-schema = "48"
Expand Down
6 changes: 4 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Defines [`GeoArrowError`], representing all errors returned by this crate.
use std::fmt::Debug;
use arrow_schema::ArrowError;
use thiserror::Error;

/// Enum with all errors in this crate.
Expand All @@ -23,8 +24,9 @@ pub enum GeoArrowError {
#[error("Overflow")]
Overflow,

// #[error(transparent)]
// Arrow(#[from] ArrowError),
#[error(transparent)]
Arrow(#[from] ArrowError),

#[error(transparent)]
FailedToConvergeError(#[from] geo::vincenty_distance::FailedToConvergeError),

Expand Down
19 changes: 14 additions & 5 deletions src/io/flatgeobuf/anyvalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use arrow_array::builder::{
UInt8Builder,
};
use arrow_array::Array;
use arrow_schema::{DataType, TimeUnit};
use geozero::ColumnValue;

use crate::error::Result;

// Types implemented by FlatGeobuf
#[derive(Debug)]
#[allow(dead_code)]
pub enum AnyMutableArray {
Bool(BooleanBuilder),
Int8(Int8Builder),
Expand Down Expand Up @@ -91,9 +93,9 @@ impl AnyMutableArray {
}
}

pub fn finish(self) -> Arc<dyn Array> {
pub fn finish(self) -> Result<Arc<dyn Array>> {
use AnyMutableArray::*;
match self {
let arr: Arc<dyn Array> = match self {
Bool(arr) => Arc::new(arr.finish_cloned()),
Int8(arr) => Arc::new(arr.finish_cloned()),
Uint8(arr) => Arc::new(arr.finish_cloned()),
Expand All @@ -108,9 +110,16 @@ impl AnyMutableArray {
String(arr) => Arc::new(arr.finish_cloned()),
Json(arr) => Arc::new(arr.finish_cloned()),
// TODO: how to support timezones? Or is this always naive tz?
DateTime(_arr) => todo!(), // arrow2::compute::cast::utf8_to_naive_timestamp_ns(&arr.into()).Arced(),
DateTime(arr) => {
let string_arr = arr.finish_cloned();
arrow_cast::cast(
&string_arr,
&DataType::Timestamp(TimeUnit::Microsecond, None),
)?
}
Binary(arr) => Arc::new(arr.finish_cloned()),
}
};
Ok(arr)
}
}

Expand Down
40 changes: 20 additions & 20 deletions src/io/flatgeobuf/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use crate::array::MutablePointArray;
use crate::array::*;
use crate::error::Result;
use crate::io::flatgeobuf::anyvalue::AnyMutableArray;
use crate::table::GeoTable;
use crate::trait_::MutableGeometryArray;
Expand All @@ -30,7 +31,7 @@ use arrow_array::builder::{
UInt8Builder,
};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use flatgeobuf::{ColumnType, GeometryType};
use flatgeobuf::{FgbReader, Header};
use geozero::{FeatureProcessor, GeomProcessor, PropertyProcessor};
Expand All @@ -46,14 +47,14 @@ macro_rules! define_table_builder {
}

impl $name {
pub fn finish(self) -> GeoTable {
pub fn finish(self) -> Result<GeoTable> {
// Set geometry column after property columns
let geometry_column_index = self.columns.len();

let mut columns = Vec::with_capacity(self.columns.len() + 1);

for mut_column in self.columns {
columns.push(mut_column.finish())
columns.push(mut_column.finish()?)
}

// Add geometry column and geometry field
Expand All @@ -73,7 +74,7 @@ macro_rules! define_table_builder {
let new_schema = Arc::new(Schema::new(fields));

let batch = RecordBatch::try_new(new_schema.clone(), columns).unwrap();
GeoTable::try_new(new_schema, vec![batch], geometry_column_index).unwrap()
GeoTable::try_new(new_schema, vec![batch], geometry_column_index)
}
}

Expand Down Expand Up @@ -400,14 +401,14 @@ impl MultiPolygonTableBuilder {
}
}

pub fn read_flatgeobuf<R: Read + Seek>(file: &mut R) -> GeoTable {
/// Read a FlatGeobuf file to a GeoTable
pub fn read_flatgeobuf<R: Read + Seek>(file: &mut R) -> Result<GeoTable> {
let mut reader = FgbReader::open(file).unwrap().select_all().unwrap();

let header = reader.header();
let features_count = reader.features_count();

let (schema, initialized_columns) = infer_schema_and_init_columns(header, features_count);
dbg!(header.geometry_type());

match header.geometry_type() {
GeometryType::Point => {
Expand Down Expand Up @@ -439,7 +440,6 @@ pub fn read_flatgeobuf<R: Read + Seek>(file: &mut R) -> GeoTable {
builder.finish()
}
GeometryType::MultiPolygon => {
dbg!("GeometryType::MultiPolygon");
let mut builder =
MultiPolygonTableBuilder::new(schema, initialized_columns, features_count);
reader.process_features(&mut builder).unwrap();
Expand Down Expand Up @@ -518,16 +518,17 @@ fn infer_schema_and_init_columns(
Field::new(col.name(), DataType::Utf8, col.nullable()),
AnyMutableArray::Json(StringBuilder::with_capacity(features_count, features_count)),
),
ColumnType::DateTime => todo!(),
// Field::new(
// col.name(),
// DataType::Timestamp(TimeUnit::Nanosecond, None),
// col.nullable(),
// ),
// AnyMutableArray::DateTime(StringBuilder::with_capacity(
// features_count,
// features_count,
// )),
ColumnType::DateTime => (
Field::new(
col.name(),
DataType::Timestamp(TimeUnit::Microsecond, None),
col.nullable(),
),
AnyMutableArray::DateTime(StringBuilder::with_capacity(
features_count,
features_count,
)),
),
ColumnType::Binary => (
Field::new(col.name(), DataType::Binary, col.nullable()),
BinaryBuilder::with_capacity(features_count, features_count).into(),
Expand All @@ -554,15 +555,14 @@ mod test {
#[test]
fn test_countries() {
let mut filein = BufReader::new(File::open("fixtures/flatgeobuf/countries.fgb").unwrap());
let _table = read_flatgeobuf(&mut filein);
let _table = read_flatgeobuf(&mut filein).unwrap();
}

#[ignore = "datetime attribute parsing not yet implemented"]
#[test]
fn test_nz_buildings() {
let mut filein = BufReader::new(
File::open("fixtures/flatgeobuf/nz-building-outlines-small.fgb").unwrap(),
);
let _table = read_flatgeobuf(&mut filein);
let _table = read_flatgeobuf(&mut filein).unwrap();
}
}
2 changes: 1 addition & 1 deletion src/io/flatgeobuf/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mod test {
write_flatgeobuf(&mut table, writer, "name").unwrap();

let mut reader = Cursor::new(output_buffer);
let new_table = read_flatgeobuf(&mut reader);
let new_table = read_flatgeobuf(&mut reader).unwrap();

// TODO: it looks like it's getting read back in backwards row order!
let batch = &new_table.batches()[0];
Expand Down

0 comments on commit 11ce205

Please sign in to comment.