From 6ecc5cf33e2c6339fec4f5a307e65e2658cb901d Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 13 Dec 2024 18:08:42 -0500 Subject: [PATCH] Add GeoHash functions to datafusion --- Cargo.lock | 13 +- rust/geodatafusion/Cargo.toml | 1 + rust/geodatafusion/README.md | 6 +- rust/geodatafusion/src/error.rs | 4 + .../src/udf/native/io/geohash.rs | 327 ++++++++++++++++++ rust/geodatafusion/src/udf/native/io/mod.rs | 4 + 6 files changed, 351 insertions(+), 4 deletions(-) create mode 100644 rust/geodatafusion/src/udf/native/io/geohash.rs diff --git a/Cargo.lock b/Cargo.lock index d6102c4b..c5761dd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1864,6 +1864,7 @@ dependencies = [ "geo 0.29.3", "geo-traits", "geoarrow", + "geohash", "thiserror", "tokio", ] @@ -1877,6 +1878,16 @@ dependencies = [ "libm", ] +[[package]] +name = "geohash" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fb94b1a65401d6cbf22958a9040aa364812c26674f841bee538b12c135db1e6" +dependencies = [ + "geo-types", + "libm", +] + [[package]] name = "geojson" version = "0.24.1" @@ -4154,7 +4165,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/rust/geodatafusion/Cargo.toml b/rust/geodatafusion/Cargo.toml index 360928ed..2f99ee6b 100644 --- a/rust/geodatafusion/Cargo.toml +++ b/rust/geodatafusion/Cargo.toml @@ -24,6 +24,7 @@ arrow-schema = "53.3" async-stream = { version = "0.3", optional = true } async-trait = { version = "0.1", optional = true } geo = "0.29.3" +geohash = "0.13.1" geo-traits = "0.2" geoarrow = { path = "../geoarrow", features = ["flatgeobuf"] } thiserror = "1" diff --git a/rust/geodatafusion/README.md b/rust/geodatafusion/README.md index 474e84a9..e6897d77 100644 --- a/rust/geodatafusion/README.md +++ b/rust/geodatafusion/README.md @@ -166,7 +166,7 @@ Spatial extensions for [Apache DataFusion](https://datafusion.apache.org/), an e | Name | Implemented | Description | | -------------------------- | ----------- | ------------------------------------------------------------------------------------------------------ | -| ST_Box2dFromGeoHash | | Return a BOX2D from a GeoHash string. | +| ST_Box2dFromGeoHash | ✅ | Return a BOX2D from a GeoHash string. | | ST_GeomFromGeoHash | | Return a geometry from a GeoHash string. | | ST_GeomFromGML | | Takes as input GML representation of geometry and outputs a PostGIS geometry object | | ST_GeomFromGeoJSON | | Takes as input a geojson representation of a geometry and outputs a PostGIS geometry object | @@ -174,7 +174,7 @@ Spatial extensions for [Apache DataFusion](https://datafusion.apache.org/), an e | ST_GeomFromTWKB | | Creates a geometry instance from a TWKB ("Tiny Well-Known Binary") geometry representation. | | ST_GMLToSQL | | Return a specified ST_Geometry value from GML representation. This is an alias name for ST_GeomFromGML | | ST_LineFromEncodedPolyline | | Creates a LineString from an Encoded Polyline. | -| ST_PointFromGeoHash | | Return a point from a GeoHash string. | +| ST_PointFromGeoHash | ✅ | Return a point from a GeoHash string. | | ST_FromFlatGeobufToTable | | Creates a table based on the structure of FlatGeobuf data. | | ST_FromFlatGeobuf | | Reads FlatGeobuf data. | @@ -212,7 +212,7 @@ Spatial extensions for [Apache DataFusion](https://datafusion.apache.org/), an e | ST_AsSVG | | Returns SVG path data for a geometry. | | ST_AsTWKB | | Returns the geometry as TWKB, aka "Tiny Well-Known Binary" | | ST_AsX3D | | Returns a Geometry in X3D xml node element format: ISO-IEC-19776-1.2-X3DEncodings-XML | -| ST_GeoHash | | Return a GeoHash representation of the geometry. | +| ST_GeoHash | ✅ | Return a GeoHash representation of the geometry. | ### Operators diff --git a/rust/geodatafusion/src/error.rs b/rust/geodatafusion/src/error.rs index a219e01e..f8921794 100644 --- a/rust/geodatafusion/src/error.rs +++ b/rust/geodatafusion/src/error.rs @@ -17,6 +17,9 @@ pub(crate) enum GeoDataFusionError { #[error(transparent)] GeoArrow(#[from] GeoArrowError), + + #[error(transparent)] + GeoHash(#[from] geohash::GeohashError), } /// Crate-specific result type. @@ -28,6 +31,7 @@ impl From for DataFusionError { GeoDataFusionError::Arrow(err) => DataFusionError::ArrowError(err, None), GeoDataFusionError::DataFusion(err) => err, GeoDataFusionError::GeoArrow(err) => DataFusionError::External(Box::new(err)), + GeoDataFusionError::GeoHash(err) => DataFusionError::External(Box::new(err)), } } } diff --git a/rust/geodatafusion/src/udf/native/io/geohash.rs b/rust/geodatafusion/src/udf/native/io/geohash.rs new file mode 100644 index 00000000..b9e2e58a --- /dev/null +++ b/rust/geodatafusion/src/udf/native/io/geohash.rs @@ -0,0 +1,327 @@ +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use arrow::array::{AsArray, StringBuilder}; +use arrow_schema::DataType; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, +}; +use geo_traits::PointTrait; +use geoarrow::array::{CoordType, PointArray, PointBuilder, RectBuilder}; +use geoarrow::datatypes::Dimension; +use geoarrow::trait_::{ArrayAccessor, NativeScalar}; +use geoarrow::ArrayBase; + +use crate::data_types::{BOX2D_TYPE, POINT2D_TYPE}; +use crate::error::GeoDataFusionResult; + +#[derive(Debug)] +pub(super) struct Box2DFromGeoHash { + signature: Signature, +} + +impl Box2DFromGeoHash { + pub fn new() -> Self { + Self { + signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable), + } + } +} + +static BOX_FROM_GEOHASH_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for Box2DFromGeoHash { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_box2dfromgeohash" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result { + Ok(BOX2D_TYPE.into()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result { + Ok(box_from_geohash_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(BOX_FROM_GEOHASH_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Return a BOX2D from a GeoHash string.", + "ST_Box2dFromGeoHash(geohash)", + ) + .with_argument("text", "geohash") + .build() + })) + } +} + +fn box_from_geohash_impl(args: &[ColumnarValue]) -> GeoDataFusionResult { + let array = ColumnarValue::values_to_arrays(args)? + .into_iter() + .next() + .unwrap(); + + let string_array = array.as_string::(); + let mut builder = + RectBuilder::with_capacity_and_options(Dimension::XY, array.len(), Default::default()); + + for s in string_array.iter() { + builder.push_rect(s.map(geohash::decode_bbox).transpose()?.as_ref()); + } + + Ok(builder.finish().into_array_ref().into()) +} + +#[derive(Debug)] +pub(super) struct PointFromGeoHash { + signature: Signature, +} + +impl PointFromGeoHash { + pub fn new() -> Self { + Self { + signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable), + } + } +} + +static POINT_FROM_GEOHASH_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for PointFromGeoHash { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_pointfromgeohash" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result { + Ok(POINT2D_TYPE.into()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result { + Ok(point_from_geohash_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(POINT_FROM_GEOHASH_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Return a point from a GeoHash string. The point represents the center point of the GeoHash.", + "ST_PointFromGeoHash(geohash)", + ) + .with_argument("text", "geohash") + .build() + })) + } +} + +fn point_from_geohash_impl(args: &[ColumnarValue]) -> GeoDataFusionResult { + let array = ColumnarValue::values_to_arrays(args)? + .into_iter() + .next() + .unwrap(); + + let string_array = array.as_string::(); + let mut builder = PointBuilder::with_capacity_and_options( + Dimension::XY, + array.len(), + CoordType::Separated, + Default::default(), + ); + + for s in string_array.iter() { + if let Some(s) = s { + let (coord, _, _) = geohash::decode(s)?; + builder.push_coord(Some(&coord)); + } else { + builder.push_null(); + } + } + + Ok(builder.finish().into_array_ref().into()) +} + +#[derive(Debug)] +pub(super) struct GeoHash { + signature: Signature, +} + +impl GeoHash { + pub fn new() -> Self { + Self { + signature: Signature::exact(vec![POINT2D_TYPE.into()], Volatility::Immutable), + } + } +} + +static GEOHASH_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for GeoHash { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_geohash" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result { + Ok(DataType::Utf8) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result { + Ok(geohash_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(GEOHASH_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Computes a GeoHash representation of a geometry. A GeoHash encodes a geographic Point into a text form that is sortable and searchable based on prefixing. A shorter GeoHash is a less precise representation of a point. It can be thought of as a box that contains the point.", + "ST_GeoHash(point)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn geohash_impl(args: &[ColumnarValue]) -> GeoDataFusionResult { + let array = ColumnarValue::values_to_arrays(args)? + .into_iter() + .next() + .unwrap(); + let point_array = PointArray::try_from((array.as_ref(), Dimension::XY))?; + + let mut builder = StringBuilder::with_capacity(array.len(), 0); + + for point in point_array.iter() { + if let Some(point) = point { + let coord = point.coord().unwrap(); + // TODO: make arg + // 12 is the max length supported by rust geohash. We should document this and maybe + // clamp numbers to 12. + let s = geohash::encode(coord.to_geo(), 12)?; + builder.append_value(s); + } else { + builder.append_null(); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) +} + +#[cfg(test)] +mod test { + use approx::relative_eq; + use arrow::array::AsArray; + use datafusion::prelude::*; + use geo_traits::{CoordTrait, PointTrait, RectTrait}; + use geoarrow::array::{PointArray, RectArray}; + use geoarrow::datatypes::Dimension; + use geoarrow::trait_::ArrayAccessor; + + use crate::data_types::{BOX2D_TYPE, POINT2D_TYPE}; + use crate::udf::native::register_native; + + #[tokio::test] + async fn test_box2d_from_geohash() { + let ctx = SessionContext::new(); + register_native(&ctx); + + let out = ctx + .sql("SELECT ST_Box2dFromGeoHash('ww8p1r4t8');") + .await + .unwrap(); + + let batches = out.collect().await.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = batches.into_iter().next().unwrap(); + assert_eq!(batch.columns().len(), 1); + + assert!(batch + .schema() + .field(0) + .data_type() + .equals_datatype(&BOX2D_TYPE.into())); + + let rect_array = RectArray::try_from((batch.columns()[0].as_ref(), Dimension::XY)).unwrap(); + let rect = rect_array.value(0); + + assert!(relative_eq!(rect.min().x(), 112.55836486816406)); + assert!(relative_eq!(rect.min().y(), 37.83236503601074)); + assert!(relative_eq!(rect.max().x(), 112.5584077835083)); + assert!(relative_eq!(rect.max().y(), 37.83240795135498)); + } + + #[tokio::test] + async fn test_point_from_geohash() { + let ctx = SessionContext::new(); + register_native(&ctx); + + let out = ctx + .sql("SELECT ST_PointFromGeoHash('9qqj');") + .await + .unwrap(); + + let batches = out.collect().await.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = batches.into_iter().next().unwrap(); + assert_eq!(batch.columns().len(), 1); + + assert!(batch + .schema() + .field(0) + .data_type() + .equals_datatype(&POINT2D_TYPE.into())); + + let point_array = + PointArray::try_from((batch.columns()[0].as_ref(), Dimension::XY)).unwrap(); + let point = point_array.value(0); + + assert!(relative_eq!(point.coord().unwrap().x(), -115.13671875)); + assert!(relative_eq!(point.coord().unwrap().y(), 36.123046875)); + } + + #[tokio::test] + async fn test_geohash() { + let ctx = SessionContext::new(); + register_native(&ctx); + + let out = ctx + .sql("SELECT ST_GeoHash( ST_Point(-126,48) );") + .await + .unwrap(); + + let batches = out.collect().await.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = batches.into_iter().next().unwrap(); + assert_eq!(batch.columns().len(), 1); + + let arr = batch.columns()[0].as_string::(); + assert_eq!(arr.value(0), "c0w3hf1s70w3"); + } +} diff --git a/rust/geodatafusion/src/udf/native/io/mod.rs b/rust/geodatafusion/src/udf/native/io/mod.rs index bbe293c8..63d63aad 100644 --- a/rust/geodatafusion/src/udf/native/io/mod.rs +++ b/rust/geodatafusion/src/udf/native/io/mod.rs @@ -1,5 +1,6 @@ //! Geometry Input and Output +mod geohash; mod union_example; mod wkb; mod wkt; @@ -8,6 +9,9 @@ use datafusion::prelude::SessionContext; /// Register all provided functions for geometry input and output pub fn register_udfs(ctx: &SessionContext) { + ctx.register_udf(geohash::Box2DFromGeoHash::new().into()); + ctx.register_udf(geohash::GeoHash::new().into()); + ctx.register_udf(geohash::PointFromGeoHash::new().into()); ctx.register_udf(wkb::AsBinary::new().into()); ctx.register_udf(wkb::GeomFromWKB::new().into()); ctx.register_udf(wkt::AsText::new().into());