Skip to content

Commit 7edff96

Browse files
committed
Don't canonicalize ListingTableUrl (apache#7994)
1 parent 0d4dc36 commit 7edff96

File tree

7 files changed

+115
-104
lines changed

7 files changed

+115
-104
lines changed

datafusion-cli/src/exec.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -387,15 +387,7 @@ mod tests {
387387
// Ensure that local files are also registered
388388
let sql =
389389
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
390-
let err = create_external_table_test(location, &sql)
391-
.await
392-
.unwrap_err();
393-
394-
if let DataFusionError::IoError(e) = err {
395-
assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
396-
} else {
397-
return Err(err);
398-
}
390+
create_external_table_test(location, &sql).await.unwrap();
399391

400392
Ok(())
401393
}

datafusion/core/src/datasource/listing/table.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -855,14 +855,17 @@ impl TableProvider for ListingTable {
855855
let input_partitions = input.output_partitioning().partition_count();
856856
let writer_mode = match self.options.insert_mode {
857857
ListingTableInsertMode::AppendToFile => {
858-
if input_partitions > file_groups.len() {
858+
if file_groups.is_empty() && self.options.single_file {
859+
// This is a hack, longer term append should be split out (#7994)
860+
crate::datasource::file_format::write::FileWriterMode::PutMultipart
861+
} else if input_partitions > file_groups.len() {
859862
return plan_err!(
860863
"Cannot append {input_partitions} partitions to {} files!",
861864
file_groups.len()
862865
);
866+
} else {
867+
crate::datasource::file_format::write::FileWriterMode::Append
863868
}
864-
865-
crate::datasource::file_format::write::FileWriterMode::Append
866869
}
867870
ListingTableInsertMode::AppendNewFiles => {
868871
crate::datasource::file_format::write::FileWriterMode::PutMultipart

datafusion/core/src/datasource/listing/url.rs

Lines changed: 91 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::fs;
18+
use std::path::{Component, PathBuf};
1919

2020
use crate::datasource::object_store::ObjectStoreUrl;
2121
use crate::execution::context::SessionState;
@@ -46,37 +46,49 @@ pub struct ListingTableUrl {
4646
impl ListingTableUrl {
4747
/// Parse a provided string as a `ListingTableUrl`
4848
///
49+
/// A URL can either refer to a single object, or a collection of objects with a
50+
/// common prefix, with the presence of a trailing `/` indicating a collection.
51+
///
52+
/// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
53+
/// `file:///foo/` refers to all the files under the directory `/foo` and its
54+
/// subdirectories.
55+
///
56+
/// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
57+
/// wherease `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
58+
/// S3 bucket `BUCKET`
59+
///
4960
/// # Paths without a Scheme
5061
///
5162
/// If no scheme is provided, or the string is an absolute filesystem path
52-
/// as determined [`std::path::Path::is_absolute`], the string will be
63+
/// as determined by [`std::path::Path::is_absolute`], the string will be
5364
/// interpreted as a path on the local filesystem using the operating
5465
/// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
5566
///
5667
/// If the path contains any of `'?', '*', '['`, it will be considered
5768
/// a glob expression and resolved as described in the section below.
5869
///
59-
/// Otherwise, the path will be resolved to an absolute path, returning
60-
/// an error if it does not exist, and converted to a [file URI]
70+
/// Otherwise, the path will be resolved to an absolute path based on the current
71+
/// working directory, and converted to a [file URI].
6172
///
62-
/// If you wish to specify a path that does not exist on the local
63-
/// machine you must provide it as a fully-qualified [file URI]
64-
/// e.g. `file:///myfile.txt`
73+
/// If the path already exists in the local filesystem this will be used to determine if this
74+
/// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
75+
/// of a trailing path delimiter will be used to indicate a directory. For the avoidance
76+
/// of ambiguity it is recommended users always include trailing `/` when intending to
77+
/// refer to a directory.
6578
///
6679
/// ## Glob File Paths
6780
///
6881
/// If no scheme is provided, and the path contains a glob expression, it will
6982
/// be resolved as follows.
7083
///
7184
/// The string up to the first path segment containing a glob expression will be extracted,
72-
/// and resolved in the same manner as a normal scheme-less path. That is, resolved to
73-
/// an absolute path on the local filesystem, returning an error if it does not exist,
74-
/// and converted to a [file URI]
85+
/// and resolved in the same manner as a normal scheme-less path above.
7586
///
7687
/// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
7788
/// filter when listing files from object storage
7889
///
7990
/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
91+
/// [URL]: https://url.spec.whatwg.org/
8092
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
8193
let s = s.as_ref();
8294

@@ -92,32 +104,6 @@ impl ListingTableUrl {
92104
}
93105
}
94106

95-
/// Get object store for specified input_url
96-
/// if input_url is actually not a url, we assume it is a local file path
97-
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
98-
pub fn parse_create_local_if_not_exists(
99-
s: impl AsRef<str>,
100-
is_directory: bool,
101-
) -> Result<Self> {
102-
let s = s.as_ref();
103-
let is_valid_url = Url::parse(s).is_ok();
104-
105-
match is_valid_url {
106-
true => ListingTableUrl::parse(s),
107-
false => {
108-
let path = std::path::PathBuf::from(s);
109-
if !path.exists() {
110-
if is_directory {
111-
fs::create_dir_all(path)?;
112-
} else {
113-
fs::File::create(path)?;
114-
}
115-
}
116-
ListingTableUrl::parse(s)
117-
}
118-
}
119-
}
120-
121107
/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
122108
fn parse_path(s: &str) -> Result<Self> {
123109
let (prefix, glob) = match split_glob_expression(s) {
@@ -129,15 +115,9 @@ impl ListingTableUrl {
129115
None => (s, None),
130116
};
131117

132-
let path = std::path::Path::new(prefix).canonicalize()?;
133-
let url = if path.is_dir() {
134-
Url::from_directory_path(path)
135-
} else {
136-
Url::from_file_path(path)
137-
}
138-
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {s}")))?;
139-
// TODO: Currently we do not have an IO-related error variant that accepts ()
140-
// or a string. Once we have such a variant, change the error type above.
118+
let url = url_from_path(prefix).ok_or_else(|| {
119+
DataFusionError::Internal(format!("Can not open path: {s}"))
120+
})?;
141121
Ok(Self::new(url, glob))
142122
}
143123

@@ -214,7 +194,12 @@ impl ListingTableUrl {
214194
}
215195
}
216196
},
217-
false => futures::stream::once(store.head(&self.prefix)).boxed(),
197+
false => futures::stream::once(store.head(&self.prefix))
198+
.filter(|r| {
199+
let p = !matches!(r, Err(object_store::Error::NotFound { .. }));
200+
futures::future::ready(p)
201+
})
202+
.boxed(),
218203
};
219204
Ok(list
220205
.try_filter(move |meta| {
@@ -257,6 +242,45 @@ impl std::fmt::Display for ListingTableUrl {
257242
}
258243
}
259244

245+
fn url_from_path(s: &str) -> Option<Url> {
246+
let path = std::path::Path::new(s);
247+
let is_dir = match path.exists() {
248+
true => path.is_dir(),
249+
// Fallback to inferring from trailing separator
250+
false => std::path::is_separator(s.chars().last()?),
251+
};
252+
253+
let p = match path.is_absolute() {
254+
true => resolve_path(path)?,
255+
false => {
256+
let absolute = std::env::current_dir().ok()?.join(path);
257+
resolve_path(&absolute)?
258+
}
259+
};
260+
261+
match is_dir {
262+
true => Url::from_directory_path(p).ok(),
263+
false => Url::from_file_path(p).ok(),
264+
}
265+
}
266+
267+
fn resolve_path(path: &std::path::Path) -> Option<PathBuf> {
268+
let mut base = PathBuf::with_capacity(path.as_os_str().len());
269+
for component in path.components() {
270+
match component {
271+
Component::Prefix(_) | Component::RootDir => base.push(component.as_os_str()),
272+
Component::Normal(p) => base.push(p),
273+
Component::CurDir => {} // Do nothing
274+
Component::ParentDir => {
275+
if !base.pop() {
276+
return None;
277+
}
278+
}
279+
}
280+
}
281+
Some(base)
282+
}
283+
260284
const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
261285

262286
/// Splits `path` at the first path segment containing a glob expression, returning
@@ -368,4 +392,25 @@ mod tests {
368392
Some(("/a/b/c//", "alltypes_plain*.parquet")),
369393
);
370394
}
395+
396+
#[test]
397+
fn test_resolve_path() {
398+
let r = resolve_path("/foo/bar/../baz.txt".as_ref()).unwrap();
399+
assert_eq!(r.to_str().unwrap(), "/foo/baz.txt");
400+
401+
let r = resolve_path("/foo/bar/./baz.txt".as_ref()).unwrap();
402+
assert_eq!(r.to_str().unwrap(), "/foo/bar/baz.txt");
403+
404+
let r = resolve_path("/foo/bar/../../../baz.txt".as_ref());
405+
assert_eq!(r, None);
406+
}
407+
408+
#[test]
409+
fn test_url_from_path() {
410+
let url = url_from_path("foo/bar").unwrap();
411+
assert!(url.path().ends_with("foo/bar"));
412+
413+
let url = url_from_path("foo/bar/").unwrap();
414+
assert!(url.path().ends_with("foo/bar/"));
415+
}
371416
}

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,6 @@ impl TableProviderFactory for ListingTableFactory {
147147
.unwrap_or(false)
148148
};
149149

150-
let create_local_path = statement_options
151-
.take_bool_option("create_local_path")?
152-
.unwrap_or(false);
153150
let single_file = statement_options
154151
.take_bool_option("single_file")?
155152
.unwrap_or(false);
@@ -205,13 +202,7 @@ impl TableProviderFactory for ListingTableFactory {
205202
FileType::AVRO => file_type_writer_options,
206203
};
207204

208-
let table_path = match create_local_path {
209-
true => ListingTableUrl::parse_create_local_if_not_exists(
210-
&cmd.location,
211-
!single_file,
212-
),
213-
false => ListingTableUrl::parse(&cmd.location),
214-
}?;
205+
let table_path = ListingTableUrl::parse(&cmd.location)?;
215206

216207
let options = ListingOptions::new(file_format)
217208
.with_collect_stat(state.config().collect_statistics())

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -565,11 +565,7 @@ impl DefaultPhysicalPlanner {
565565
copy_options,
566566
}) => {
567567
let input_exec = self.create_initial_plan(input, session_state).await?;
568-
569-
// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
570-
// TODO: add additional configurable options for if existing files should be overwritten or
571-
// appended to
572-
let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?;
568+
let parsed_url = ListingTableUrl::parse(output_url)?;
573569
let object_store_url = parsed_url.object_store();
574570

575571
let schema: Schema = (**input.schema()).clone().into();

0 commit comments

Comments
 (0)