Skip to content

Commit 1ff5211

Browse files
authored
Merge branch 'main' into webdav-list-op
2 parents b04b300 + 91a0733 commit 1ff5211

File tree

4 files changed

+69
-77
lines changed

4 files changed

+69
-77
lines changed

src/raw/http_util/header.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use http::header::CONTENT_RANGE;
1919
use http::header::CONTENT_TYPE;
2020
use http::header::ETAG;
2121
use http::header::LAST_MODIFIED;
22+
use http::header::LOCATION;
2223
use http::HeaderMap;
2324
use time::format_description::well_known::Rfc2822;
2425
use time::OffsetDateTime;
@@ -30,6 +31,24 @@ use crate::ObjectMetadata;
3031
use crate::ObjectMode;
3132
use crate::Result;
3233

34+
/// Parse redirect location from header map
35+
///
36+
/// # Note
37+
/// The returned value maybe a relative path, like `/index.html`, `/robots.txt`, etc.
38+
pub fn parse_location(headers: &HeaderMap) -> Result<Option<&str>> {
39+
match headers.get(LOCATION) {
40+
None => Ok(None),
41+
Some(v) => Ok(Some(v.to_str().map_err(|e| {
42+
Error::new(
43+
ErrorKind::Unexpected,
44+
"header value has to be valid utf-8 string",
45+
)
46+
.with_operation("http_util::parse_location")
47+
.set_source(e)
48+
})?)),
49+
}
50+
}
51+
3352
/// Parse content length from header map.
3453
pub fn parse_content_length(headers: &HeaderMap) -> Result<Option<u64>> {
3554
match headers.get(CONTENT_LENGTH) {

src/raw/http_util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub use header::parse_content_type;
3636
pub use header::parse_etag;
3737
pub use header::parse_into_object_metadata;
3838
pub use header::parse_last_modified;
39+
pub use header::parse_location;
3940

4041
mod uri;
4142
pub use uri::percent_encode_path;

src/services/webhdfs/backend.rs

Lines changed: 49 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use super::message::BooleanResp;
3232
use super::message::FileStatusType;
3333
use super::message::FileStatusWrapper;
3434
use super::message::FileStatusesWrapper;
35-
use super::message::Redirection;
3635
use crate::ops::*;
3736
use crate::raw::*;
3837
use crate::*;
@@ -271,7 +270,7 @@ impl WebhdfsBackend {
271270
"CREATE"
272271
};
273272
let mut url = format!(
274-
"{}/webhdfs/v1/{}?op={}&overwrite=true&noredirect=true",
273+
"{}/webhdfs/v1/{}?op={}&overwrite=true",
275274
self.endpoint,
276275
percent_encode_path(&p),
277276
op,
@@ -283,21 +282,35 @@ impl WebhdfsBackend {
283282
let req = Request::put(&url)
284283
.body(AsyncBody::Empty)
285284
.map_err(new_request_build_error)?;
285+
286286
// mkdir does not redirect
287287
if path.ends_with('/') {
288288
return Ok(req);
289289
}
290290

291291
let resp = self.client.send_async(req).await?;
292292

293-
self.webhdfs_put_redirect(resp, size, content_type, body)
294-
.await
293+
// should be a 307 TEMPORARY_REDIRECT
294+
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
295+
return Err(parse_error(resp).await?);
296+
}
297+
let re_url = self.follow_redirect(resp)?;
298+
299+
let mut re_builder = Request::put(re_url);
300+
if let Some(size) = size {
301+
re_builder = re_builder.header(CONTENT_LENGTH, size.to_string());
302+
}
303+
if let Some(content_type) = content_type {
304+
re_builder = re_builder.header(CONTENT_TYPE, content_type);
305+
}
306+
307+
re_builder.body(body).map_err(new_request_build_error)
295308
}
296309

297310
async fn webhdfs_open_req(&self, path: &str, range: &BytesRange) -> Result<Request<AsyncBody>> {
298311
let p = build_abs_path(&self.root, path);
299312
let mut url = format!(
300-
"{}/webhdfs/v1/{}?op=OPEN&noredirect=true",
313+
"{}/webhdfs/v1/{}?op=OPEN",
301314
self.endpoint,
302315
percent_encode_path(&p),
303316
);
@@ -378,15 +391,16 @@ impl WebhdfsBackend {
378391
let req = self.webhdfs_open_req(path, &range).await?;
379392
let resp = self.client.send_async(req).await?;
380393

381-
// this should be an 200 OK http response
382-
// with JSON redirect message in its body
383-
if resp.status() != StatusCode::OK {
384-
// let the outside handle this error
385-
return Ok(resp);
394+
// this should be a 307 redirect
395+
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
396+
return Err(parse_error(resp).await?);
386397
}
387398

388-
let redirected = self.webhdfs_get_redirect(resp).await?;
389-
self.client.send_async(redirected).await
399+
let re_url = self.follow_redirect(resp)?;
400+
let re_req = Request::get(&re_url)
401+
.body(AsyncBody::Empty)
402+
.map_err(new_request_build_error)?;
403+
self.client.send_async(re_req).await
390404
}
391405

392406
async fn webhdfs_status_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
@@ -427,56 +441,32 @@ impl WebhdfsBackend {
427441

428442
self.client.send_async(req).await
429443
}
430-
431-
/// get redirect destination from 307 TEMPORARY_REDIRECT http response
432-
async fn follow_redirect(&self, resp: Response<IncomingAsyncBody>) -> Result<String> {
433-
let bs = resp.into_body().bytes().await.map_err(|e| {
434-
Error::new(ErrorKind::Unexpected, "redirection receive fail")
435-
.with_context("service", Scheme::Webhdfs)
436-
.set_source(e)
437-
})?;
438-
let loc = serde_json::from_reader::<_, Redirection>(bs.reader())
439-
.map_err(|e| {
440-
Error::new(ErrorKind::Unexpected, "redirection fail")
441-
.with_context("service", Scheme::Webhdfs)
442-
.set_permanent()
443-
.set_source(e)
444-
})?
445-
.location;
446-
447-
Ok(loc)
448-
}
449444
}
450445

451446
impl WebhdfsBackend {
452-
async fn webhdfs_get_redirect(
453-
&self,
454-
redirection: Response<IncomingAsyncBody>,
455-
) -> Result<Request<AsyncBody>> {
456-
let redirect = self.follow_redirect(redirection).await?;
457-
458-
Request::get(redirect)
459-
.body(AsyncBody::Empty)
460-
.map_err(new_request_build_error)
461-
}
462-
463-
async fn webhdfs_put_redirect(
464-
&self,
465-
resp: Response<IncomingAsyncBody>,
466-
size: Option<u64>,
467-
content_type: Option<&str>,
468-
body: AsyncBody,
469-
) -> Result<Request<AsyncBody>> {
470-
let redirect = self.follow_redirect(resp).await?;
471-
472-
let mut req = Request::put(redirect);
473-
if let Some(size) = size {
474-
req = req.header(CONTENT_LENGTH, size.to_string());
475-
}
476-
if let Some(content_type) = content_type {
477-
req = req.header(CONTENT_TYPE, content_type);
478-
}
479-
req.body(body).map_err(new_request_build_error)
447+
/// get redirect destination from 307 TEMPORARY_REDIRECT http response
448+
fn follow_redirect(&self, resp: Response<IncomingAsyncBody>) -> Result<String> {
449+
let loc = match parse_location(resp.headers())? {
450+
Some(p) => {
451+
if !p.starts_with('/') {
452+
// is not relative path
453+
p.to_string()
454+
} else {
455+
// is relative path
456+
// prefix with endpoint url
457+
let url = self.endpoint.clone();
458+
format!("{url}/{p}")
459+
}
460+
}
461+
None => {
462+
let err = Error::new(
463+
ErrorKind::Unexpected,
464+
"redirection fail: no location header",
465+
);
466+
return Err(err);
467+
}
468+
};
469+
Ok(loc)
480470
}
481471

482472
fn consume_success_mkdir(&self, path: &str, parts: Parts, body: &str) -> Result<RpCreate> {
@@ -497,9 +487,7 @@ impl WebhdfsBackend {
497487
));
498488
}
499489
}
500-
}
501490

502-
impl WebhdfsBackend {
503491
async fn check_root(&self) -> Result<()> {
504492
let resp = self.webhdfs_status_object("/").await?;
505493
match resp.status() {

src/services/webhdfs/message.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@ use serde::Deserialize;
1818

1919
use crate::*;
2020

21-
#[derive(Debug, Deserialize)]
22-
#[serde(rename_all = "PascalCase")]
23-
pub(super) struct Redirection {
24-
pub location: String,
25-
}
26-
2721
#[derive(Debug, Deserialize)]
2822
pub(super) struct BooleanResp {
2923
pub boolean: bool,
@@ -89,16 +83,6 @@ mod test {
8983
use crate::services::webhdfs::dir_stream::DirStream;
9084
use crate::ObjectMode;
9185

92-
#[test]
93-
fn test_file_statuses() {
94-
let json = r#"{"Location":"http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..."}"#;
95-
let redir: Redirection = serde_json::from_str(json).expect("must success");
96-
assert_eq!(
97-
redir.location,
98-
"http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..."
99-
);
100-
}
101-
10286
#[test]
10387
fn test_file_status() {
10488
let json = r#"

0 commit comments

Comments
 (0)