From 3835461614b8f04bc65df1d752ae81fe02f6d50b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 12 Dec 2024 22:30:28 -0500 Subject: [PATCH] Update wkb/wkt functions --- rust/geodatafusion/src/udf/geo/mod.rs | 8 -- rust/geodatafusion/src/udf/geo/wkb.rs | 61 ------------ rust/geodatafusion/src/udf/geo/wkt.rs | 54 ---------- rust/geodatafusion/src/udf/io/mod.rs | 14 +++ rust/geodatafusion/src/udf/io/wkb.rs | 137 ++++++++++++++++++++++++++ rust/geodatafusion/src/udf/io/wkt.rs | 137 ++++++++++++++++++++++++++ 6 files changed, 288 insertions(+), 123 deletions(-) delete mode 100644 rust/geodatafusion/src/udf/geo/wkb.rs delete mode 100644 rust/geodatafusion/src/udf/geo/wkt.rs create mode 100644 rust/geodatafusion/src/udf/io/mod.rs create mode 100644 rust/geodatafusion/src/udf/io/wkb.rs create mode 100644 rust/geodatafusion/src/udf/io/wkt.rs diff --git a/rust/geodatafusion/src/udf/geo/mod.rs b/rust/geodatafusion/src/udf/geo/mod.rs index 81f787aa..c037a961 100644 --- a/rust/geodatafusion/src/udf/geo/mod.rs +++ b/rust/geodatafusion/src/udf/geo/mod.rs @@ -8,16 +8,12 @@ mod coord_dim; mod envelope; mod linear_ref; mod measurement; -mod wkb; -mod wkt; pub use centroid::centroid; pub use convex_hull::convex_hull; pub use coord_dim::coord_dim; pub use envelope::envelope; pub use r#box::{box_2d, xmax, xmin, ymax, ymin}; -pub use wkb::{as_binary, from_wkb}; -pub use wkt::{as_text, from_text}; use datafusion::prelude::SessionContext; @@ -26,14 +22,10 @@ pub fn register_geo(ctx: &SessionContext) { constructors::register_constructors(ctx); measurement::register_measurement(ctx); - ctx.register_udf(as_binary()); - ctx.register_udf(as_text()); ctx.register_udf(centroid()); ctx.register_udf(convex_hull()); ctx.register_udf(coord_dim()); ctx.register_udf(envelope()); - ctx.register_udf(from_text()); - ctx.register_udf(from_wkb()); // Box functions ctx.register_udf(box_2d()); diff --git a/rust/geodatafusion/src/udf/geo/wkb.rs b/rust/geodatafusion/src/udf/geo/wkb.rs deleted file mode 100644 index 0a48169c..00000000 --- a/rust/geodatafusion/src/udf/geo/wkb.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::sync::Arc; - -use arrow::array::AsArray; -use arrow_schema::DataType; -use datafusion::error::DataFusionError; -use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility}; -use datafusion::prelude::create_udf; -use geoarrow::array::{AsNativeArray, CoordType, GeometryArray, WKBArray}; -use geoarrow::datatypes::NativeType; -use geoarrow::io::wkb::to_wkb; -use geoarrow::{ArrayBase, NativeArray}; - -use crate::udf::geo::util::geometry_data_type; - -/// Returns the Well-Known Binary representation of the geometry. -/// -/// There are 2 variants of the function. The first variant takes no endian encoding paramater and -/// defaults to little endian. The second variant takes a second argument denoting the encoding - -/// using little-endian ('NDR') or big-endian ('XDR') encoding. -pub fn as_binary() -> ScalarUDF { - let udf = Arc::new(|args: &[ColumnarValue]| { - let args = ColumnarValue::values_to_arrays(args)?; - let arg = args.into_iter().next().unwrap(); - let geom_arr = GeometryArray::try_from(arg.as_ref()).unwrap(); - let wkb_arr = to_wkb::(geom_arr.as_ref()); - Ok(ColumnarValue::from(wkb_arr.into_array_ref())) - }); - - create_udf( - "st_asbinary", - vec![geometry_data_type()], - DataType::Binary, - Volatility::Immutable, - udf, - ) -} - -pub fn from_wkb() -> ScalarUDF { - let udf = Arc::new(|args: &[ColumnarValue]| { - let args = ColumnarValue::values_to_arrays(args)?; - let arg = args.into_iter().next().unwrap(); - let wkb_arr = WKBArray::new(arg.as_binary::().clone(), Default::default()); - let native_arr = geoarrow::io::wkb::from_wkb( - &wkb_arr, - NativeType::Geometry(CoordType::Separated), - false, - ) - .map_err(|err| DataFusionError::External(Box::new(err)))?; - Ok(ColumnarValue::from( - native_arr.as_ref().as_geometry().to_array_ref(), - )) - }); - - create_udf( - "st_geomfromwkb", - vec![DataType::Binary], - geometry_data_type(), - Volatility::Immutable, - udf, - ) -} diff --git a/rust/geodatafusion/src/udf/geo/wkt.rs b/rust/geodatafusion/src/udf/geo/wkt.rs deleted file mode 100644 index 7f27e3c7..00000000 --- a/rust/geodatafusion/src/udf/geo/wkt.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::sync::Arc; - -use arrow::array::AsArray; -use arrow_schema::DataType; -use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility}; -use datafusion::prelude::create_udf; -use geoarrow::array::{AsNativeArray, CoordType, WKTArray}; -use geoarrow::io::wkt::ToWKT; -use geoarrow::{ArrayBase, NativeArray}; - -use crate::error::GeoDataFusionResult; -use crate::udf::geo::util::{geometry_data_type, parse_single_arg_to_geometry_array}; - -/// ST_AsText -/// -/// - Return the Well-Known Text (WKT) representation of the geometry/geography without SRID metadata. -pub fn as_text() -> ScalarUDF { - create_udf( - "st_astext", - vec![geometry_data_type()], - DataType::Utf8, - Volatility::Immutable, - Arc::new(|args: &[ColumnarValue]| Ok(_as_text(args)?)), - ) -} - -fn _as_text(args: &[ColumnarValue]) -> GeoDataFusionResult { - let geom_arr = parse_single_arg_to_geometry_array(args)?; - let wkt_arr = geom_arr.as_ref().to_wkt::()?; - Ok(wkt_arr.into_array_ref().into()) -} - -/// ST_GeomFromText -/// -/// - Return a specified ST_Geometry value from Well-Known Text representation (WKT). -pub fn from_text() -> ScalarUDF { - create_udf( - "st_geomfromtext", - vec![DataType::Utf8], - geometry_data_type(), - Volatility::Immutable, - Arc::new(|args: &[ColumnarValue]| Ok(_from_text(args)?)), - ) -} - -fn _from_text(args: &[ColumnarValue]) -> GeoDataFusionResult { - let arg = ColumnarValue::values_to_arrays(args)? - .into_iter() - .next() - .unwrap(); - let wkt_arr = WKTArray::new(arg.as_string::().clone(), Default::default()); - let native_arr = geoarrow::io::wkt::read_wkt(&wkt_arr, CoordType::Separated, false)?; - Ok(native_arr.as_ref().as_geometry().to_array_ref().into()) -} diff --git a/rust/geodatafusion/src/udf/io/mod.rs b/rust/geodatafusion/src/udf/io/mod.rs new file mode 100644 index 00000000..09d6811b --- /dev/null +++ b/rust/geodatafusion/src/udf/io/mod.rs @@ -0,0 +1,14 @@ +//! Geometry Input and Output + +mod wkb; +mod wkt; + +use datafusion::prelude::SessionContext; + +/// Register all provided functions for geometry input and output +pub fn register_io(ctx: &SessionContext) { + ctx.register_udf(wkb::AsBinary::new().into()); + ctx.register_udf(wkb::GeomFromWKB::new().into()); + ctx.register_udf(wkt::AsText::new().into()); + ctx.register_udf(wkt::GeomFromText::new().into()); +} diff --git a/rust/geodatafusion/src/udf/io/wkb.rs b/rust/geodatafusion/src/udf/io/wkb.rs new file mode 100644 index 00000000..7bda68fa --- /dev/null +++ b/rust/geodatafusion/src/udf/io/wkb.rs @@ -0,0 +1,137 @@ +use std::any::Any; +use std::sync::OnceLock; + +use arrow::array::AsArray; +use arrow_schema::DataType; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, +}; +use geoarrow::array::{CoordType, WKBArray}; +use geoarrow::datatypes::NativeType; +use geoarrow::io::wkb::{from_wkb, to_wkb}; +use geoarrow::ArrayBase; + +use crate::data_types::{any_single_geometry_type_input, parse_to_native_array, GEOMETRY_TYPE}; +use crate::error::GeoDataFusionResult; + +#[derive(Debug)] +pub(super) struct AsBinary { + signature: Signature, +} + +impl AsBinary { + pub fn new() -> Self { + // TODO: extend to allow specifying little/big endian + Self { + signature: any_single_geometry_type_input(), + } + } +} + +static AS_BINARY_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for AsBinary { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_asbinary" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result { + Ok(DataType::Binary) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result { + Ok(as_binary_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(AS_BINARY_DOC.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_OTHER) + .with_description( + "Returns the OGC/ISO Well-Known Binary (WKB) representation of the geometry.", + ) + .with_argument("g1", "geometry") + .build() + .unwrap() + })) + } +} + +fn as_binary_impl(args: &[ColumnarValue]) -> GeoDataFusionResult { + let array = ColumnarValue::values_to_arrays(args)? + .into_iter() + .next() + .unwrap(); + let native_array = parse_to_native_array(array)?; + let wkb_arr = to_wkb::(native_array.as_ref()); + Ok(wkb_arr.into_array_ref().into()) +} + +#[derive(Debug)] +pub(super) struct GeomFromWKB { + signature: Signature, +} + +impl GeomFromWKB { + pub fn new() -> Self { + Self { + signature: Signature::coercible(vec![DataType::Binary], Volatility::Immutable), + } + } +} + +static GEOM_FROM_WKB_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for GeomFromWKB { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_geomfromwkb" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result { + Ok(GEOMETRY_TYPE.into()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result { + Ok(geom_from_wkb_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(GEOM_FROM_WKB_DOC.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_OTHER) + .with_description( + "Takes a well-known binary representation of a geometry and a Spatial Reference System ID (SRID) and creates an instance of the appropriate geometry type", + ) + .with_argument("geom", "WKB buffers") + .build() + .unwrap() + })) + } +} + +fn geom_from_wkb_impl(args: &[ColumnarValue]) -> GeoDataFusionResult { + let array = ColumnarValue::values_to_arrays(args)? + .into_iter() + .next() + .unwrap(); + let wkb_arr = WKBArray::new(array.as_binary::().clone(), Default::default()); + let native_arr = from_wkb(&wkb_arr, NativeType::Geometry(CoordType::Separated), false)?; + Ok(native_arr.to_array_ref().into()) +} diff --git a/rust/geodatafusion/src/udf/io/wkt.rs b/rust/geodatafusion/src/udf/io/wkt.rs new file mode 100644 index 00000000..1d46fb85 --- /dev/null +++ b/rust/geodatafusion/src/udf/io/wkt.rs @@ -0,0 +1,137 @@ +use std::any::Any; +use std::sync::OnceLock; + +use arrow::array::AsArray; +use arrow_schema::DataType; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, +}; +use geoarrow::array::{CoordType, WKTArray}; +use geoarrow::io::wkt::{read_wkt, ToWKT}; +use geoarrow::ArrayBase; + +use crate::data_types::{any_single_geometry_type_input, parse_to_native_array, GEOMETRY_TYPE}; +use crate::error::GeoDataFusionResult; + +#[derive(Debug)] +pub(super) struct AsText { + signature: Signature, +} + +impl AsText { + pub fn new() -> Self { + // TODO: extend to allow specifying little/big endian + Self { + signature: any_single_geometry_type_input(), + } + } +} + +static AS_TEXT_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for AsText { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_astext" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result { + Ok(DataType::Utf8) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result { + Ok(as_text_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(AS_TEXT_DOC.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_OTHER) + .with_description( + "Returns the OGC Well-Known Text (WKT) representation of the geometry/geography.", + ) + .with_argument("g1", "geometry") + .build() + .unwrap() + })) + } +} + +fn as_text_impl(args: &[ColumnarValue]) -> GeoDataFusionResult { + let array = ColumnarValue::values_to_arrays(args)? + .into_iter() + .next() + .unwrap(); + let native_array = parse_to_native_array(array)?; + let wkt_arr = native_array.as_ref().to_wkt::()?; + Ok(wkt_arr.into_array_ref().into()) +} + +#[derive(Debug)] +pub(super) struct GeomFromText { + signature: Signature, +} + +impl GeomFromText { + pub fn new() -> Self { + // TODO: extend to allow specifying little/big endian + Self { + signature: Signature::coercible(vec![DataType::Utf8], Volatility::Immutable), + } + } +} + +static GEOM_FROM_TEXT_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for GeomFromText { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_astext" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result { + Ok(GEOMETRY_TYPE.into()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result { + Ok(geom_from_text_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(GEOM_FROM_TEXT_DOC.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_OTHER) + .with_description( + "Constructs a geometry object from the OGC Well-Known text representation.", + ) + .with_argument("g1", "geometry") + .build() + .unwrap() + })) + } +} + +fn geom_from_text_impl(args: &[ColumnarValue]) -> GeoDataFusionResult { + let array = ColumnarValue::values_to_arrays(args)? + .into_iter() + .next() + .unwrap(); + let wkt_arr = WKTArray::new(array.as_string::().clone(), Default::default()); + let native_arr = read_wkt(&wkt_arr, CoordType::Separated, false)?; + Ok(native_arr.to_array_ref().into()) +}