From 30fb0c6678c774dc74839fbc657cb1fb1885518d Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Tue, 21 Jan 2025 02:00:36 +0300 Subject: [PATCH] Adds basic geoparquet support (#94) We already write postgis geometry column as WKB formatted binary, as specified by [geoparquet spec](https://geoparquet.org/releases/v1.1.0/). With this PR, we also write basic geoparquet metadata to key value metadata of the parquet file. We support only basic info (required by the spec) to be interoperable with [duckdb](https://github.com/duckdb/duckdb/blob/2e533ec9dfaa82baac24ae1104e501af723a565a/extension/parquet/geo_parquet.cpp#L179). --- Cargo.lock | 2 + Cargo.toml | 2 + README.md | 2 +- src/arrow_parquet/parquet_writer.rs | 51 +++++++---- src/pgrx_tests/copy_type_roundtrip.rs | 117 +++++++++++++++++++++++- src/type_compat/geometry.rs | 126 +++++++++++++++++++++++++- 6 files changed, 279 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bfe857f..67ce14a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2477,6 +2477,8 @@ dependencies = [ "pgrx", "pgrx-tests", "rust-ini", + "serde", + "serde_json", "tokio", "url", ] diff --git a/Cargo.toml b/Cargo.toml index c545672..3ec97ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ parquet = {version = "53", default-features = false, features = [ ]} pgrx = "=0.12.9" rust-ini = "0.21" +serde = "1" +serde_json = "1" tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]} url = "2" diff --git a/README.md b/README.md index 1190d42..cb35f31 100644 --- a/README.md +++ b/README.md @@ -294,7 +294,7 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`: > * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (9) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (29 integral digits before decimal point, 9 digits after decimal point). > - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files. > - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files. -> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB` when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type. +> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB`, specified by [geoparquet spec](https://geoparquet.org/releases/v1.1.0/), when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type. > - (5) `crunchy_map` is dependent on functionality provided by [Crunchy Bridge](https://www.crunchydata.com/products/crunchy-bridge). The `crunchy_map` type is represented as `GROUP` with `MAP` logical type when `crunchy_map` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type. > [!WARNING] diff --git a/src/arrow_parquet/parquet_writer.rs b/src/arrow_parquet/parquet_writer.rs index 4f5713f..bf586bd 100644 --- a/src/arrow_parquet/parquet_writer.rs +++ b/src/arrow_parquet/parquet_writer.rs @@ -5,8 +5,9 @@ use arrow_schema::SchemaRef; use parquet::{ arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter}, file::properties::{EnabledStatistics, WriterProperties}, + format::KeyValue, }; -use pgrx::{heap_tuple::PgHeapTuple, pg_sys::RECORDOID, AllocatedByRust, PgTupleDesc}; +use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc}; use url::Url; use crate::{ @@ -18,7 +19,10 @@ use crate::{ uri_utils::parquet_writer_from_uri, }, pgrx_utils::{collect_attributes_for, CollectAttributesFor}, - type_compat::{geometry::reset_postgis_context, map::reset_map_context}, + type_compat::{ + geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context}, + map::reset_map_context, + }, PG_BACKEND_TOKIO_RUNTIME, }; @@ -42,25 +46,11 @@ impl ParquetWriterContext { compression_level: i32, tupledesc: &PgTupleDesc, ) -> ParquetWriterContext { - debug_assert!(tupledesc.oid() == RECORDOID); - // Postgis and Map contexts are used throughout writing the parquet file. // We need to reset them to avoid reading the stale data. (e.g. extension could be dropped) reset_postgis_context(); reset_map_context(); - let writer_props = WriterProperties::builder() - .set_statistics_enabled(EnabledStatistics::Page) - .set_compression( - PgParquetCompressionWithLevel { - compression, - compression_level, - } - .into(), - ) - .set_created_by("pg_parquet".to_string()) - .build(); - let attributes = collect_attributes_for(CollectAttributesFor::CopyTo, tupledesc); pgrx::debug2!( @@ -71,6 +61,8 @@ impl ParquetWriterContext { let schema = parse_arrow_schema_from_attributes(&attributes); let schema = Arc::new(schema); + let writer_props = Self::writer_props(tupledesc, compression, compression_level); + let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props); let attribute_contexts = @@ -83,6 +75,33 @@ impl ParquetWriterContext { } } + fn writer_props( + tupledesc: &PgTupleDesc, + compression: PgParquetCompression, + compression_level: i32, + ) -> WriterProperties { + let compression = PgParquetCompressionWithLevel { + compression, + compression_level, + }; + + let mut writer_props_builder = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .set_compression(compression.into()) + .set_created_by("pg_parquet".to_string()); + + let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc); + + if geometry_columns_metadata_value.is_some() { + let key_value_metadata = KeyValue::new("geo".into(), geometry_columns_metadata_value); + + writer_props_builder = + writer_props_builder.set_key_value_metadata(Some(vec![key_value_metadata])); + } + + writer_props_builder.build() + } + pub(crate) fn write_new_row_group( &mut self, tuples: Vec>>, diff --git a/src/pgrx_tests/copy_type_roundtrip.rs b/src/pgrx_tests/copy_type_roundtrip.rs index a5af2b1..f3c69fc 100644 --- a/src/pgrx_tests/copy_type_roundtrip.rs +++ b/src/pgrx_tests/copy_type_roundtrip.rs @@ -8,18 +8,20 @@ mod tests { LOCAL_TEST_FILE_PATH, }; use crate::type_compat::fallback_to_text::FallbackToText; - use crate::type_compat::geometry::Geometry; + use crate::type_compat::geometry::{ + Geometry, GeometryColumnsMetadata, GeometryEncoding, GeometryType, + }; use crate::type_compat::map::Map; use crate::type_compat::pg_arrow_type_conversions::{ DEFAULT_UNBOUNDED_NUMERIC_PRECISION, DEFAULT_UNBOUNDED_NUMERIC_SCALE, }; use pgrx::pg_sys::Oid; - use pgrx::pg_test; use pgrx::{ composite_type, datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone}, AnyNumeric, Spi, }; + use pgrx::{pg_test, JsonB}; #[pg_test] fn test_int2() { @@ -946,6 +948,117 @@ mod tests { test_table.assert_expected_and_result_rows(); } + #[pg_test] + fn test_geometry_geoparquet_metadata() { + // Skip the test if postgis extension is not available + if !extension_exists("postgis") { + return; + } + + let query = "DROP EXTENSION IF EXISTS postgis; CREATE EXTENSION postgis;"; + Spi::run(query).unwrap(); + + let copy_to_query = format!( + "COPY (SELECT ST_GeomFromText('POINT(1 1)')::geometry(point) as a, + ST_GeomFromText('LINESTRING(0 0, 1 1)')::geometry(linestring) as b, + ST_GeomFromText('POLYGON((0 0, 1 1, 2 2, 0 0))')::geometry(polygon) as c, + ST_GeomFromText('MULTIPOINT((0 0), (1 1))')::geometry(multipoint) as d, + ST_GeomFromText('MULTILINESTRING((0 0, 1 1), (2 2, 3 3))')::geometry(multilinestring) as e, + ST_GeomFromText('MULTIPOLYGON(((0 0, 1 1, 2 2, 0 0)), ((3 3, 4 4, 5 5, 3 3)))')::geometry(multipolygon) as f, + ST_GeomFromText('GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(0 0, 1 1))')::geometry(geometrycollection) as g + ) + TO '{LOCAL_TEST_FILE_PATH}' WITH (format parquet);", + ); + Spi::run(copy_to_query.as_str()).unwrap(); + + // Check geoparquet metadata + let geoparquet_metadata_query = format!( + "select encode(value, 'escape')::jsonb + from parquet.kv_metadata('{LOCAL_TEST_FILE_PATH}') + where encode(key, 'escape') = 'geo';", + ); + let geoparquet_metadata_json = Spi::get_one::(geoparquet_metadata_query.as_str()) + .unwrap() + .unwrap(); + + let geoparquet_metadata: GeometryColumnsMetadata = + serde_json::from_value(geoparquet_metadata_json.0).unwrap(); + + // assert common metadata + assert_eq!(geoparquet_metadata.version, "1.1.0"); + assert_eq!(geoparquet_metadata.primary_column, "a"); + + // point + assert_eq!( + geoparquet_metadata.columns.get("a").unwrap().encoding, + GeometryEncoding::WKB + ); + assert_eq!( + geoparquet_metadata.columns.get("a").unwrap().geometry_types, + vec![GeometryType::Point] + ); + + // linestring + assert_eq!( + geoparquet_metadata.columns.get("b").unwrap().encoding, + GeometryEncoding::WKB + ); + assert_eq!( + geoparquet_metadata.columns.get("b").unwrap().geometry_types, + vec![GeometryType::LineString] + ); + + // polygon + assert_eq!( + geoparquet_metadata.columns.get("c").unwrap().encoding, + GeometryEncoding::WKB + ); + assert_eq!( + geoparquet_metadata.columns.get("c").unwrap().geometry_types, + vec![GeometryType::Polygon] + ); + + // multipoint + assert_eq!( + geoparquet_metadata.columns.get("d").unwrap().encoding, + GeometryEncoding::WKB + ); + assert_eq!( + geoparquet_metadata.columns.get("d").unwrap().geometry_types, + vec![GeometryType::MultiPoint] + ); + + // multilinestring + assert_eq!( + geoparquet_metadata.columns.get("e").unwrap().encoding, + GeometryEncoding::WKB + ); + assert_eq!( + geoparquet_metadata.columns.get("e").unwrap().geometry_types, + vec![GeometryType::MultiLineString] + ); + + // multipolygon + assert_eq!( + geoparquet_metadata.columns.get("f").unwrap().encoding, + GeometryEncoding::WKB + ); + assert_eq!( + geoparquet_metadata.columns.get("f").unwrap().geometry_types, + vec![GeometryType::MultiPolygon] + ); + + // geometrycollection + assert_eq!( + geoparquet_metadata.columns.get("g").unwrap().encoding, + GeometryEncoding::WKB + ); + assert_eq!( + geoparquet_metadata.columns.get("g").unwrap().geometry_types, + vec![GeometryType::GeometryCollection] + ); + } + #[pg_test] fn test_complex_composite() { Spi::run("CREATE TYPE dog AS (name text, age int);").unwrap(); diff --git a/src/type_compat/geometry.rs b/src/type_compat/geometry.rs index 128072d..94723ba 100644 --- a/src/type_compat/geometry.rs +++ b/src/type_compat/geometry.rs @@ -1,4 +1,4 @@ -use std::{ffi::CString, ops::Deref}; +use std::{collections::HashMap, ffi::CString, ops::Deref}; use once_cell::sync::OnceCell; use pgrx::{ @@ -8,8 +8,11 @@ use pgrx::{ InvalidOid, LookupFuncName, Oid, OidFunctionCall1Coll, SysCacheIdentifier::TYPENAMENSP, BYTEAOID, }, - FromDatum, IntoDatum, PgList, Spi, + FromDatum, IntoDatum, PgList, PgTupleDesc, Spi, }; +use serde::{Deserialize, Serialize}; + +use crate::pgrx_utils::{collect_attributes_for, CollectAttributesFor}; // we need to reset the postgis context at each copy start static mut POSTGIS_CONTEXT: OnceCell = OnceCell::new(); @@ -45,6 +48,125 @@ pub(crate) fn is_postgis_geometry_type(typoid: Oid) -> bool { false } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) enum GeometryType { + Point, + LineString, + Polygon, + MultiPoint, + MultiLineString, + MultiPolygon, + GeometryCollection, +} + +impl GeometryType { + fn from_typmod(typmod: i32) -> Option { + // see postgis: https://github.com/postgis/postgis/blob/2845d3f37896e64ad24a2ee6863213b297da1301/liblwgeom/liblwgeom.h.in#L194 + let geom_type = (typmod & 0x000000FC) >> 2; + + match geom_type { + 1 => Some(GeometryType::Point), + 2 => Some(GeometryType::LineString), + 3 => Some(GeometryType::Polygon), + 4 => Some(GeometryType::MultiPoint), + 5 => Some(GeometryType::MultiLineString), + 6 => Some(GeometryType::MultiPolygon), + 7 => Some(GeometryType::GeometryCollection), + _ => None, + } + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) enum GeometryEncoding { + // only WKB is supported for now + #[allow(clippy::upper_case_acronyms)] + WKB, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct GeometryColumn { + pub(crate) encoding: GeometryEncoding, + pub(crate) geometry_types: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct GeometryColumnsMetadata { + pub(crate) version: String, + pub(crate) primary_column: String, + pub(crate) columns: HashMap, +} + +impl GeometryColumnsMetadata { + fn from_tupledesc(tupledesc: &PgTupleDesc) -> Option { + let mut columns = HashMap::new(); + let mut primary_column = String::new(); + + let attributes = collect_attributes_for(CollectAttributesFor::CopyTo, tupledesc); + + for attribute in attributes { + let attribute_typoid = attribute.type_oid().value(); + + if !is_postgis_geometry_type(attribute_typoid) { + continue; + } + + let typmod = attribute.type_mod(); + + let geometry_types = if let Some(geom_type) = GeometryType::from_typmod(typmod) { + vec![geom_type] + } else { + vec![] + }; + + let encoding = GeometryEncoding::WKB; + + let geometry_column = GeometryColumn { + encoding, + geometry_types, + }; + + let column_name = attribute.name().to_string(); + + // we use the first geometry column as the primary column + if primary_column.is_empty() { + primary_column = column_name.clone(); + } + + columns.insert(column_name, geometry_column); + } + + if columns.is_empty() { + return None; + } + + Some(GeometryColumnsMetadata { + version: "1.1.0".into(), + primary_column, + columns, + }) + } +} + +// geoparquet_metadata_json_from_tupledesc returns metadata for geometry columns in json format. +// in a format specified by https://geoparquet.org/releases/v1.1.0 +// e.g. "{\"version\":\"1.1.0\", +// \"primary_column\":\"a\", +// \"columns\":{\"a\":{\"encoding\":\"WKB\", \"geometry_types\":[\"Point\"]}, +// \"b\":{\"encoding\":\"WKB\", \"geometry_types\":[\"LineString\"]}}}" +pub(crate) fn geoparquet_metadata_json_from_tupledesc(tupledesc: &PgTupleDesc) -> Option { + let geometry_columns_metadata = GeometryColumnsMetadata::from_tupledesc(tupledesc); + + geometry_columns_metadata.map(|metadata| { + serde_json::to_string(&metadata).unwrap_or_else(|_| { + panic!( + "failed to serialize geometry columns metadata {:?}", + metadata + ) + }) + }) +} + #[derive(Debug, PartialEq, Clone)] struct PostgisContext { geometry_typoid: Option,