From 8e039ab24477e57f19cbc86016eb0a879ea6bd83 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 9 Dec 2024 22:50:38 -0500 Subject: [PATCH] Parquet expand glob pattern --- Cargo.lock | 1 + rust/geoarrow/Cargo.toml | 3 +- rust/geoarrow/src/io/parquet/mod.rs | 2 +- rust/geoarrow/src/io/parquet/reader/glob.rs | 112 ++++++++++++++++++++ rust/geoarrow/src/io/parquet/reader/mod.rs | 2 + 5 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 rust/geoarrow/src/io/parquet/reader/glob.rs diff --git a/Cargo.lock b/Cargo.lock index f9c4d34f..e9d7eea4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1313,6 +1313,7 @@ dependencies = [ "geo-traits", "geos", "geozero", + "glob", "half", "http-range-client", "indexmap", diff --git a/rust/geoarrow/Cargo.toml b/rust/geoarrow/Cargo.toml index 1b0ba804..43ec5b96 100644 --- a/rust/geoarrow/Cargo.toml +++ b/rust/geoarrow/Cargo.toml @@ -23,7 +23,7 @@ flatgeobuf_async = [ gdal = ["dep:gdal"] geos = ["dep:geos"] ipc_compression = ["arrow-ipc/lz4", "arrow-ipc/zstd"] -parquet = ["dep:parquet"] +parquet = ["dep:parquet", "dep:glob"] parquet_async = [ "parquet", "parquet/async", @@ -69,6 +69,7 @@ geo-index = "0.1.1" geo-traits = "0.2" geos = { version = "9.0", features = ["v3_10_0", "geo"], optional = true } geozero = { version = "0.14", features = ["with-wkb"] } +glob = { version = "0.3.1", optional = true } half = { version = "2.4.1" } http-range-client = { version = "0.8", optional = true } indexmap = { version = "2" } diff --git a/rust/geoarrow/src/io/parquet/mod.rs b/rust/geoarrow/src/io/parquet/mod.rs index 598c1350..19b447c5 100644 --- a/rust/geoarrow/src/io/parquet/mod.rs +++ b/rust/geoarrow/src/io/parquet/mod.rs @@ -66,7 +66,7 @@ mod test; mod writer; pub use reader::{ - GeoParquetDatasetMetadata, GeoParquetReaderMetadata, GeoParquetReaderOptions, + expand_glob, GeoParquetDatasetMetadata, GeoParquetReaderMetadata, GeoParquetReaderOptions, GeoParquetRecordBatchReader, GeoParquetRecordBatchReaderBuilder, }; #[cfg(feature = "parquet_async")] diff --git a/rust/geoarrow/src/io/parquet/reader/glob.rs b/rust/geoarrow/src/io/parquet/reader/glob.rs new file mode 100644 index 00000000..f097add1 --- /dev/null +++ b/rust/geoarrow/src/io/parquet/reader/glob.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; + +use futures::StreamExt; +use glob::{self, Pattern}; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::error::Result; + +/// Find all files within an object store with the specified pattern and suffix +pub async fn expand_glob( + store: Arc, + pattern: &Pattern, + suffix: Option<&str>, +) -> Result> { + let mut out = Vec::new(); + + // There are glob characters in the pattern + if let Some(first_glob_char_idx) = pattern.as_str().find(['?', '*', '[']) { + dbg!("glob branch"); + + // Strip off the chars before the glob char + // If there's a / char before the glob char, we use that as the prefix for listing in the + // object store + if let Some((prefix, _suffix)) = pattern.as_str()[..first_glob_char_idx].rsplit_once('/') { + while let Some(item) = store.list(Some(&prefix.into())).next().await { + let item = item?; + if item_matches(&item, pattern, suffix) { + out.push(item); + } + } + + return Ok(out); + } else { + dbg!("branch 2"); + while let Some(item) = store.list(None).next().await { + dbg!("item"); + dbg!(&item); + let item = item?; + if item_matches(&item, pattern, suffix) { + out.push(item); + } + } + + return Ok(out); + } + } else { + // Otherwise, list without a prefix + while let Some(item) = store.list(None).next().await { + let item = item?; + if item_matches(&item, pattern, suffix) { + out.push(item); + } + } + } + + Ok(out) +} + +fn item_matches(item: &ObjectMeta, pattern: &Pattern, suffix: Option<&str>) -> bool { + if pattern.matches(item.location.as_ref()) { + if let Some(suffix) = suffix { + if item.location.as_ref().ends_with(suffix) { + return true; + } + } else { + return true; + } + }; + + false +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use super::*; + use futures::TryStreamExt; + use object_store::memory::InMemory; + use object_store::PutPayload; + + #[tokio::test] + async fn test_matches_path() { + dbg!("hello world"); + + let store = Arc::new(InMemory::new()); + store + .put(&"file1.txt".into(), PutPayload::new()) + .await + .unwrap(); + store + .put(&"file2.txt".into(), PutPayload::new()) + .await + .unwrap(); + store + .put(&"file3.txt".into(), PutPayload::new()) + .await + .unwrap(); + + dbg!("done put"); + + let list = store.as_ref().list(None); + let x = list.try_collect::>().await.unwrap(); + dbg!(x); + + let pattern = Pattern::new("file*.txt").unwrap(); + let result = expand_glob(store, &pattern, Some(".txt")).await.unwrap(); + + // result.iter().map(||) + dbg!(result); + } +} diff --git a/rust/geoarrow/src/io/parquet/reader/mod.rs b/rust/geoarrow/src/io/parquet/reader/mod.rs index 5f8a5f6d..f8a5cde5 100644 --- a/rust/geoarrow/src/io/parquet/reader/mod.rs +++ b/rust/geoarrow/src/io/parquet/reader/mod.rs @@ -1,12 +1,14 @@ #[cfg(feature = "parquet_async")] mod r#async; mod builder; +mod glob; mod metadata; mod options; mod parse; mod spatial_filter; pub use builder::{GeoParquetRecordBatchReader, GeoParquetRecordBatchReaderBuilder}; +pub use glob::expand_glob; pub use metadata::{GeoParquetDatasetMetadata, GeoParquetReaderMetadata}; pub use options::GeoParquetReaderOptions; #[cfg(feature = "parquet_async")]