Skip to content

Commit

Permalink
Update wkb/wkt functions
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Dec 13, 2024
1 parent d775224 commit 3835461
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 123 deletions.
8 changes: 0 additions & 8 deletions rust/geodatafusion/src/udf/geo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ mod coord_dim;
mod envelope;
mod linear_ref;
mod measurement;
mod wkb;
mod wkt;

pub use centroid::centroid;
pub use convex_hull::convex_hull;
pub use coord_dim::coord_dim;
pub use envelope::envelope;
pub use r#box::{box_2d, xmax, xmin, ymax, ymin};
pub use wkb::{as_binary, from_wkb};
pub use wkt::{as_text, from_text};

use datafusion::prelude::SessionContext;

Expand All @@ -26,14 +22,10 @@ pub fn register_geo(ctx: &SessionContext) {
constructors::register_constructors(ctx);
measurement::register_measurement(ctx);

ctx.register_udf(as_binary());
ctx.register_udf(as_text());
ctx.register_udf(centroid());
ctx.register_udf(convex_hull());
ctx.register_udf(coord_dim());
ctx.register_udf(envelope());
ctx.register_udf(from_text());
ctx.register_udf(from_wkb());

// Box functions
ctx.register_udf(box_2d());
Expand Down
61 changes: 0 additions & 61 deletions rust/geodatafusion/src/udf/geo/wkb.rs

This file was deleted.

54 changes: 0 additions & 54 deletions rust/geodatafusion/src/udf/geo/wkt.rs

This file was deleted.

14 changes: 14 additions & 0 deletions rust/geodatafusion/src/udf/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Geometry Input and Output
mod wkb;
mod wkt;

use datafusion::prelude::SessionContext;

/// Register all provided functions for geometry input and output
pub fn register_io(ctx: &SessionContext) {
ctx.register_udf(wkb::AsBinary::new().into());
ctx.register_udf(wkb::GeomFromWKB::new().into());
ctx.register_udf(wkt::AsText::new().into());
ctx.register_udf(wkt::GeomFromText::new().into());
}
137 changes: 137 additions & 0 deletions rust/geodatafusion/src/udf/io/wkb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::any::Any;
use std::sync::OnceLock;

use arrow::array::AsArray;
use arrow_schema::DataType;
use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER;
use datafusion::logical_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};
use geoarrow::array::{CoordType, WKBArray};
use geoarrow::datatypes::NativeType;
use geoarrow::io::wkb::{from_wkb, to_wkb};
use geoarrow::ArrayBase;

use crate::data_types::{any_single_geometry_type_input, parse_to_native_array, GEOMETRY_TYPE};
use crate::error::GeoDataFusionResult;

#[derive(Debug)]
pub(super) struct AsBinary {
signature: Signature,
}

impl AsBinary {
pub fn new() -> Self {
// TODO: extend to allow specifying little/big endian
Self {
signature: any_single_geometry_type_input(),
}
}
}

static AS_BINARY_DOC: OnceLock<Documentation> = OnceLock::new();

impl ScalarUDFImpl for AsBinary {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"st_asbinary"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
Ok(DataType::Binary)
}

fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result<ColumnarValue> {
Ok(as_binary_impl(args)?)
}

fn documentation(&self) -> Option<&Documentation> {
Some(AS_BINARY_DOC.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_OTHER)
.with_description(
"Returns the OGC/ISO Well-Known Binary (WKB) representation of the geometry.",
)
.with_argument("g1", "geometry")
.build()
.unwrap()
}))
}
}

fn as_binary_impl(args: &[ColumnarValue]) -> GeoDataFusionResult<ColumnarValue> {
let array = ColumnarValue::values_to_arrays(args)?
.into_iter()
.next()
.unwrap();
let native_array = parse_to_native_array(array)?;
let wkb_arr = to_wkb::<i32>(native_array.as_ref());
Ok(wkb_arr.into_array_ref().into())
}

#[derive(Debug)]
pub(super) struct GeomFromWKB {
signature: Signature,
}

impl GeomFromWKB {
pub fn new() -> Self {
Self {
signature: Signature::coercible(vec![DataType::Binary], Volatility::Immutable),
}
}
}

static GEOM_FROM_WKB_DOC: OnceLock<Documentation> = OnceLock::new();

impl ScalarUDFImpl for GeomFromWKB {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"st_geomfromwkb"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
Ok(GEOMETRY_TYPE.into())
}

fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result<ColumnarValue> {
Ok(geom_from_wkb_impl(args)?)
}

fn documentation(&self) -> Option<&Documentation> {
Some(GEOM_FROM_WKB_DOC.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_OTHER)
.with_description(
"Takes a well-known binary representation of a geometry and a Spatial Reference System ID (SRID) and creates an instance of the appropriate geometry type",
)
.with_argument("geom", "WKB buffers")
.build()
.unwrap()
}))
}
}

fn geom_from_wkb_impl(args: &[ColumnarValue]) -> GeoDataFusionResult<ColumnarValue> {
let array = ColumnarValue::values_to_arrays(args)?
.into_iter()
.next()
.unwrap();
let wkb_arr = WKBArray::new(array.as_binary::<i32>().clone(), Default::default());
let native_arr = from_wkb(&wkb_arr, NativeType::Geometry(CoordType::Separated), false)?;
Ok(native_arr.to_array_ref().into())
}
Loading

0 comments on commit 3835461

Please sign in to comment.