diff --git a/Cargo.lock b/Cargo.lock index 88102bb3fdc8..f23c4345e26c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3955,8 +3955,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf" +source = "git+https://github.com/waynr/arrow-rs?rev=3c4a84376a25af362597dad66821ff55777fd370#3c4a84376a25af362597dad66821ff55777fd370" dependencies = [ "async-trait", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index cc12d6b2e429..24f19cfd81b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -190,3 +190,8 @@ large_futures = "warn" [workspace.lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] } unused_qualifications = "deny" + +[patch.crates-io] +#object_store = { path = "../../../apache/arrow-rs/object_store" } +#object_store = { git = "https://github.com/waynr/arrow-rs", branch = "object_store/introduce-extensions" } +object_store = { git = "https://github.com/waynr/arrow-rs", rev = "3c4a84376a25af362597dad66821ff55777fd370" } diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 123ecc2f9582..17ff74fcd775 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -26,13 +26,19 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{error::Result, scalar::ScalarValue}; use std::any::Any; -use std::fmt::Formatter; +use std::fmt::{Display, Formatter}; use std::{fmt, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, Constraints, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning}; +use futures::stream::BoxStream; +use object_store::path::Path; +use object_store::{ + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, +}; use crate::datasource::data_source::FileSource; pub use datafusion_datasource::file_scan_config::*; @@ -156,13 +162,98 @@ pub struct FileScanConfig { pub source: Arc, } +#[derive(Debug)] +struct ContextualizedObjectStore { + inner: Arc, + extensions: object_store::Extensions, +} + +impl ContextualizedObjectStore { + fn new(inner: Arc, extensions: object_store::Extensions) -> Self { + Self { inner, extensions } + } +} + +impl Display for ContextualizedObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "ContextualizedObjectStore({})", self.inner) + } +} + +#[async_trait::async_trait] +impl ObjectStore for ContextualizedObjectStore { + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + async fn get_opts( + &self, + location: &Path, + mut options: GetOptions, + ) -> object_store::Result { + options.extensions = self.extensions.clone(); + self.inner.get_opts(location, options).await + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'_, object_store::Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, payload, opts).await + } +} + impl DataSource for FileScanConfig { fn open( &self, partition: usize, context: Arc, ) -> Result { - let object_store = context.runtime_env().object_store(&self.object_store_url); + let object_store = context + .runtime_env() + .object_store(&self.object_store_url) + .map(|i| -> Arc { + Arc::new(ContextualizedObjectStore::new( + i, + context.session_config().clone_extensions_for_object_store(), + )) + }); let source = self .source diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 53646dc5b468..b75a310b012a 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -571,6 +571,13 @@ impl SessionConfig { .cloned() .map(|ext| Arc::downcast(ext).expect("TypeId unique")) } + + /// Get ObjectStore-compatible extensions. + pub fn clone_extensions_for_object_store(&self) -> object_store::Extensions { + let exts: HashMap> = + self.extensions.clone().into_iter().collect(); + exts.into() + } } impl From for SessionConfig {