Skip to content

Commit

Permalink
feat(core/services-azblob): support user defined metadata (#5274)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgehermo9 authored Nov 3, 2024
1 parent 10d64e2 commit fab80c9
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 41 deletions.
17 changes: 17 additions & 0 deletions core/src/raw/http_util/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use base64::engine::general_purpose;
use base64::Engine;
use chrono::DateTime;
Expand Down Expand Up @@ -189,6 +191,21 @@ pub fn parse_into_metadata(path: &str, headers: &HeaderMap) -> Result<Metadata>
Ok(m)
}

/// Parse prefixed headers and return a map with the prefix of each header removed.
pub fn parse_prefixed_headers(headers: &HeaderMap, prefix: &str) -> HashMap<String, String> {
headers
.iter()
.filter_map(|(name, value)| {
name.as_str().strip_prefix(prefix).and_then(|stripped_key| {
value
.to_str()
.ok()
.map(|parsed_value| (stripped_key.to_string(), parsed_value.to_string()))
})
})
.collect()
}

/// format content md5 header by given input.
pub fn format_content_md5(bs: &[u8]) -> String {
let mut hasher = md5::Md5::new();
Expand Down
1 change: 1 addition & 0 deletions core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub use header::parse_header_to_str;
pub use header::parse_into_metadata;
pub use header::parse_last_modified;
pub use header::parse_location;
pub use header::parse_prefixed_headers;

mod uri;
pub use uri::percent_decode_path;
Expand Down
14 changes: 13 additions & 1 deletion core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use reqsign::AzureStorageSigner;
use sha2::Digest;
use sha2::Sha256;

use super::core::constants::X_MS_META_PREFIX;
use super::error::parse_error;
use super::lister::AzblobLister;
use super::writer::AzblobWriter;
Expand Down Expand Up @@ -517,6 +518,7 @@ impl Access for AzblobBackend {
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,
write_with_user_metadata: true,

delete: true,
copy: true,
Expand Down Expand Up @@ -545,7 +547,17 @@ impl Access for AzblobBackend {
let status = resp.status();

match status {
StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new),
StatusCode::OK => {
let headers = resp.headers();
let mut meta = parse_into_metadata(path, headers)?;

let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
if !user_meta.is_empty() {
meta.with_user_metadata(user_meta);
}

Ok(RpStat::new(meta))
}
_ => Err(parse_error(resp)),
}
}
Expand Down
11 changes: 10 additions & 1 deletion core/src/services/azblob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::Duration;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use constants::X_MS_META_PREFIX;
use http::header::HeaderName;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
Expand All @@ -42,13 +43,14 @@ use uuid::Uuid;
use crate::raw::*;
use crate::*;

mod constants {
pub mod constants {
pub const X_MS_VERSION: &str = "x-ms-version";

pub const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
pub const X_MS_COPY_SOURCE: &str = "x-ms-copy-source";
pub const X_MS_BLOB_CACHE_CONTROL: &str = "x-ms-blob-cache-control";
pub const X_MS_BLOB_CONDITION_APPENDPOS: &str = "x-ms-blob-condition-appendpos";
pub const X_MS_META_PREFIX: &str = "x-ms-meta-";

// Server-side encryption with customer-provided headers
pub const X_MS_ENCRYPTION_KEY: &str = "x-ms-encryption-key";
Expand Down Expand Up @@ -243,12 +245,19 @@ impl AzblobCore {

let mut req = Request::put(&url);

if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
req = req.header(format!("{X_MS_META_PREFIX}{key}"), value)
}
}

// Set SSE headers.
req = self.insert_sse_headers(req);

if let Some(cache_control) = args.cache_control() {
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,7 @@ impl Access for OssBackend {
match status {
StatusCode::OK => {
let headers = resp.headers();
let mut meta =
self.core
.parse_metadata(path, constants::X_OSS_META_PREFIX, resp.headers())?;
let mut meta = self.core.parse_metadata(path, resp.headers())?;

if let Some(v) = parse_header_to_str(headers, "x-oss-version-id")? {
meta.set_version(v);
Expand Down
29 changes: 6 additions & 23 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use std::time::Duration;

use bytes::Bytes;
use constants::X_OSS_META_PREFIX;
use http::header::CACHE_CONTROL;
use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
Expand Down Expand Up @@ -190,7 +190,7 @@ impl OssCore {
"the format of the user metadata key is invalid, please refer the document",
));
}
req = req.header(format!("{}{}", constants::X_OSS_META_PREFIX, key), value)
req = req.header(format!("{X_OSS_META_PREFIX}{key}"), value)
}
}

Expand All @@ -213,28 +213,11 @@ impl OssCore {
/// # Notes
///
/// before return the user defined metadata, we'll strip the user_metadata_prefix from the key
pub fn parse_metadata(
&self,
path: &str,
user_metadata_prefix: &str,
headers: &HeaderMap,
) -> Result<Metadata> {
pub fn parse_metadata(&self, path: &str, headers: &HeaderMap) -> Result<Metadata> {
let mut m = parse_into_metadata(path, headers)?;

let data: HashMap<String, String> = headers
.iter()
.filter_map(|(key, _)| {
key.as_str()
.strip_prefix(user_metadata_prefix)
.and_then(|stripped_key| {
parse_header_to_str(headers, key)
.unwrap_or(None)
.map(|val| (stripped_key.to_string(), val.to_string()))
})
})
.collect();
if !data.is_empty() {
m.with_user_metadata(data);
let user_meta = parse_prefixed_headers(headers, X_OSS_META_PREFIX);
if !user_meta.is_empty() {
m.with_user_metadata(user_meta);
}

Ok(m)
Expand Down
14 changes: 2 additions & 12 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Buf;
use constants::X_AMZ_META_PREFIX;
use http::Response;
use http::StatusCode;
use log::debug;
Expand Down Expand Up @@ -970,18 +971,7 @@ impl Access for S3Backend {
let headers = resp.headers();
let mut meta = parse_into_metadata(path, headers)?;

let user_meta: HashMap<String, String> = headers
.iter()
.filter_map(|(name, _)| {
name.as_str()
.strip_prefix(constants::X_AMZ_META_PREFIX)
.and_then(|stripped_key| {
parse_header_to_str(headers, name)
.unwrap_or(None)
.map(|val| (stripped_key.to_string(), val.to_string()))
})
})
.collect();
let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
if !user_meta.is_empty() {
meta.with_user_metadata(user_meta);
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::time::Duration;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use constants::X_AMZ_META_PREFIX;
use http::header::HeaderName;
use http::header::CACHE_CONTROL;
use http::header::CONTENT_DISPOSITION;
Expand Down Expand Up @@ -462,7 +463,7 @@ impl S3Core {
// Set user metadata headers.
if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
req = req.header(format!("{}{}", constants::X_AMZ_META_PREFIX, key), value)
req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
}
}

Expand Down

0 comments on commit fab80c9

Please sign in to comment.