Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/azblob): support multi write for azblob #4181

Merged
merged 25 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,9 @@ impl Accessor for AzblobBackend {
read_with_override_content_disposition: true,

write: true,
write_can_empty: true,
write_can_append: true,
write_can_empty: true,
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,

Expand Down Expand Up @@ -631,7 +632,7 @@ impl Accessor for AzblobBackend {
let w = if args.append() {
AzblobWriters::Two(oio::AppendWriter::new(w))
} else {
AzblobWriters::One(oio::OneShotWriter::new(w))
AzblobWriters::One(oio::BlockWriter::new(w, args.concurrent()))
};

Ok((RpWrite::default(), w))
Expand Down
126 changes: 120 additions & 6 deletions core/src/services/azblob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use std::time::Duration;

use bytes::Bytes;
use http::header::HeaderName;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
Expand All @@ -33,6 +28,13 @@ use reqsign::AzureStorageCredential;
use reqsign::AzureStorageLoader;
use reqsign::AzureStorageSigner;
use serde::Deserialize;
use serde::Serialize;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use std::time::Duration;
use uuid::Uuid;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -370,6 +372,102 @@ impl AzblobCore {
Ok(req)
}

pub fn azblob_put_block_list_request(
&self,
path: &str,
size: Option<u64>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
// refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?
let p = build_abs_path(&self.root, path);

let url = format!(
"{}/{}/{}?comp=blocklist",
self.endpoint,
self.container,
percent_encode_path(&p)
);
let mut req = Request::put(&url);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
}
if let Some(content_type) = args.content_type() {
req = req.header(CONTENT_TYPE, content_type);
};

// Set SSE headers.
req = self.insert_sse_headers(req);

// Set body
let req = req.body(body).map_err(new_request_build_error)?;

Ok(req)
}

pub async fn azblob_put_block_list(
&self,
path: &str,
size: Option<u64>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self.azblob_put_block_list_request(path, size, args, body)?;

self.sign(&mut req).await?;
self.send(req).await
}

pub async fn azblob_complete_put_block_list_request(
&self,
path: &str,
block_ids: Vec<Uuid>,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}/{}?comp=blocklist",
self.endpoint,
self.container,
percent_encode_path(&p)
);

let req = Request::post(&url);

// Set SSE headers.
let req = self.insert_sse_headers(req);

// Set body
// refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?
let req_body = {
let block_list = block_ids
.iter()
.map(|block_id| format!(" <Uncommitted>{}</Uncommitted>", block_id))
.collect::<Vec<String>>()
.join("\n");

format!("<BlockList>\n{}\n</BlockList>", block_list)
};

let body = AsyncBody::Bytes(Bytes::from(req_body.to_string()));
let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}

pub async fn azblob_complete_put_block_list(
&self,
path: &str,
block_ids: Vec<Uuid>,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self
.azblob_complete_put_block_list_request(path, block_ids)
.await?;

self.sign(&mut req).await?;

self.send(req).await
}

pub fn azblob_head_blob_request(
&self,
path: &str,
Expand Down Expand Up @@ -548,6 +646,22 @@ pub struct Blobs {
pub blob_prefix: Vec<BlobPrefix>,
}

#[derive(Clone, Default, Debug, Serialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct CompleteMultipartUploadRequestPart {
#[serde(rename = "PartNumber")]
pub part_number: usize,
#[serde(rename = "ETag")]
pub etag: String,
}

/// Request of CompleteMultipartUploadRequest
#[derive(Default, Debug, Serialize)]
#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
pub struct CompleteMultipartUploadRequest {
pub part: Vec<CompleteMultipartUploadRequestPart>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct BlobPrefix {
Expand Down
101 changes: 72 additions & 29 deletions core/src/services/azblob/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use http::StatusCode;
use uuid::Uuid;

use super::core::AzblobCore;
use super::error::parse_error;
Expand All @@ -27,7 +28,7 @@ use crate::*;

const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";

pub type AzblobWriters = TwoWays<oio::OneShotWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;
pub type AzblobWriters = TwoWays<oio::BlockWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;

pub struct AzblobWriter {
core: Arc<AzblobCore>,
Expand All @@ -42,34 +43,6 @@ impl AzblobWriter {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl oio::OneShotWrite for AzblobWriter {
async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let mut req = self.core.azblob_put_blob_request(
&self.path,
Some(bs.len() as u64),
&self.op,
AsyncBody::ChunkedBytes(bs),
)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl oio::AppendWrite for AzblobWriter {
Expand Down Expand Up @@ -137,3 +110,73 @@ impl oio::AppendWrite for AzblobWriter {
}
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[async_trait]
impl oio::BlockWrite for AzblobWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req: http::Request<AsyncBody> =
self.core
.azblob_put_blob_request(&self.path, Some(size), &self.op, body)?;
self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn write_block(&self, _block_id: Uuid, size: u64, body: AsyncBody) -> Result<()> {
let resp = self
.core
.azblob_put_block_list(&self.path, Some(size), &self.op, body)
.await?;

let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
let resp = self
.core
.azblob_complete_put_block_list(&self.path, block_ids)
.await?;

let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn abort_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
// Azure blob storage will gc uncommitted blocks after 7 days.
for block_id in block_ids {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
let resp = self.core.azblob_delete_blob(&block_id.to_string()).await?;
match resp.status() {
StatusCode::OK => {
resp.into_body().consume().await?;
}
_ => return Err(parse_error(resp).await?),
}
}
Ok(())
}
}
Loading