diff --git a/.env.example b/.env.example index bcb6952d97d..083076a9693 100644 --- a/.env.example +++ b/.env.example @@ -216,3 +216,6 @@ OPENDAL_ICLOUD_PASSWORD= OPENDAL_ICLOUD_TRUST_TOKEN= OPENDAL_ICLOUD_DS_WEB_AUTH_TOKEN= OPENDAL_ICLOUD_IS_CHINA_MAINLAND=true +# vercel blob +OPENDAL_VERCEL_BLOB_ROOT=/path/to/dir +OPENDAL_VERCEL_BLOB_TOKEN= diff --git a/core/Cargo.toml b/core/Cargo.toml index 81a1668eeef..40ffb52de71 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -199,6 +199,7 @@ services-swift = [] services-tikv = ["tikv-client"] services-upyun = ["dep:hmac", "dep:sha1"] services-vercel-artifacts = [] +services-vercel-blob = [] # Deprecated # wasabi services support has been removed. # We will remove this feature in the next version. diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index e7ffb008444..16f467329b6 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -377,3 +377,10 @@ mod koofr; pub use koofr::Koofr; #[cfg(feature = "services-koofr")] pub use koofr::KoofrConfig; + +#[cfg(feature = "services-vercel-blob")] +mod vercel_blob; +#[cfg(feature = "services-vercel-blob")] +pub use vercel_blob::VercelBlob; +#[cfg(feature = "services-vercel-blob")] +pub use vercel_blob::VercelBlobConfig; diff --git a/core/src/services/vercel_blob/backend.rs b/core/src/services/vercel_blob/backend.rs new file mode 100644 index 00000000000..4f63fc0362a --- /dev/null +++ b/core/src/services/vercel_blob/backend.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; +use serde::Deserialize; + +use super::core::parse_blob; +use super::core::Blob; +use super::core::VercelBlobCore; +use super::error::parse_error; +use super::lister::VercelBlobLister; +use super::writer::VercelBlobWriter; +use super::writer::VercelBlobWriters; +use crate::raw::*; +use crate::*; + +/// Config for backblaze VercelBlob services support. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct VercelBlobConfig { + /// root of this backend. + /// + /// All operations will happen under this root. + pub root: Option, + /// vercel blob token. + pub token: String, +} + +impl Debug for VercelBlobConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Config"); + + ds.field("root", &self.root); + + ds.finish() + } +} + +/// [VercelBlob](https://vercel.com/docs/storage/vercel-blob) services support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct VercelBlobBuilder { + config: VercelBlobConfig, + + http_client: Option, +} + +impl Debug for VercelBlobBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("VercelBlobBuilder"); + + d.field("config", &self.config); + d.finish_non_exhaustive() + } +} + +impl VercelBlobBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// Vercel Blob token. + /// + /// Get from Vercel environment variable `BLOB_READ_WRITE_TOKEN`. + /// It is required. + pub fn token(&mut self, token: &str) -> &mut Self { + self.config.token = token.to_string(); + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for VercelBlobBuilder { + const SCHEME: Scheme = Scheme::VercelBlob; + type Accessor = VercelBlobBackend; + + /// Converts a HashMap into an VercelBlobBuilder instance. + /// + /// # Arguments + /// + /// * `map` - A HashMap containing the configuration values. + /// + /// # Returns + /// + /// Returns an instance of VercelBlobBuilder. + fn from_map(map: HashMap) -> Self { + // Deserialize the configuration from the HashMap. + let config = VercelBlobConfig::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + // Create an VercelBlobBuilder instance with the deserialized config. + VercelBlobBuilder { + config, + http_client: None, + } + } + + /// Builds the backend and returns the result of VercelBlobBackend. + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.config.root.clone().unwrap_or_default()); + debug!("backend use root {}", &root); + + // Handle token. + if self.config.token.is_empty() { + return Err(Error::new(ErrorKind::ConfigInvalid, "token is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::VercelBlob)); + } + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::VercelBlob) + })? + }; + + Ok(VercelBlobBackend { + core: Arc::new(VercelBlobCore { + root, + token: self.config.token.clone(), + client, + }), + }) + } +} + +/// Backend for VercelBlob services. +#[derive(Debug, Clone)] +pub struct VercelBlobBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for VercelBlobBackend { + type Reader = IncomingAsyncBody; + type Writer = VercelBlobWriters; + type Lister = oio::PageLister; + type BlockingReader = (); + type BlockingWriter = (); + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::VercelBlob) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + read: true, + read_can_next: true, + read_with_range: true, + + write: true, + write_can_empty: true, + write_can_multi: true, + write_multi_min_size: Some(5 * 1024 * 1024), + + delete: true, + copy: true, + + list: true, + list_with_limit: true, + + ..Default::default() + }); + + am + } + + async fn stat(&self, path: &str, _args: OpStat) -> Result { + let resp = self.core.head(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let resp: Blob = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + parse_blob(&resp).map(RpStat::new) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.core.download(path, args).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let size = parse_content_length(resp.headers())?; + let range = parse_content_range(resp.headers())?; + Ok(( + RpRead::new().with_size(size).with_range(range), + resp.into_body(), + )) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let concurrent = args.concurrent(); + let writer = VercelBlobWriter::new(self.core.clone(), args, path.to_string()); + + let w = oio::MultipartWriter::new(writer, concurrent); + + Ok((RpWrite::default(), w)) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + self.core.delete(path).await.map(|_| RpDelete::default()) + } + + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { + let resp = self.core.copy(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + + Ok(RpCopy::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let l = VercelBlobLister::new(self.core.clone(), path, args.limit()); + Ok((RpList::default(), oio::PageLister::new(l))) + } +} diff --git a/core/src/services/vercel_blob/core.rs b/core/src/services/vercel_blob/core.rs new file mode 100644 index 00000000000..54900dfff8d --- /dev/null +++ b/core/src/services/vercel_blob/core.rs @@ -0,0 +1,433 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use bytes::Bytes; +use http::header; +use http::request; +use http::Request; +use http::Response; +use http::StatusCode; +use serde::Deserialize; +use serde::Serialize; +use serde_json::json; + +use self::constants::*; +use crate::raw::*; +use crate::*; + +use super::error::parse_error; + +pub(super) mod constants { + // https://github.com/vercel/storage/blob/main/packages/blob/src/put.ts#L16 + // x-content-type specifies the MIME type of the file being uploaded. + pub const X_VERCEL_BLOB_CONTENT_TYPE: &str = "x-content-type"; + // x-add-random-suffix specifying whether to add a random suffix to the pathname + // Default value is 1, which means to add a random suffix. + // Set it to 0 to disable the random suffix. + pub const X_VERCEL_BLOB_ADD_RANDOM_SUFFIX: &str = "x-add-random-suffix"; + // https://github.com/vercel/storage/blob/main/packages/blob/src/put-multipart.ts#L84 + // x-mpu-action specifies the action to perform on the MPU. + // Possible values are: + // - create: create a new MPU. + // - upload: upload a part to an existing MPU. + // - complete: complete an existing MPU. + pub const X_VERCEL_BLOB_MPU_ACTION: &str = "x-mpu-action"; + pub const X_VERCEL_BLOB_MPU_KEY: &str = "x-mpu-key"; + pub const X_VERCEL_BLOB_MPU_PART_NUMBER: &str = "x-mpu-part-number"; + pub const X_VERCEL_BLOB_MPU_UPLOAD_ID: &str = "x-mpu-upload-id"; +} + +#[derive(Clone)] +pub struct VercelBlobCore { + /// The root of this core. + pub root: String, + /// Vercel Blob token. + pub token: String, + + pub client: HttpClient, +} + +impl Debug for VercelBlobCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .finish_non_exhaustive() + } +} + +impl VercelBlobCore { + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } + + pub fn sign(&self, req: request::Builder) -> request::Builder { + req.header(header::AUTHORIZATION, format!("Bearer {}", self.token)) + } +} + +impl VercelBlobCore { + pub async fn download(&self, path: &str, args: OpRead) -> Result> { + let p = build_abs_path(&self.root, path); + // Vercel blob use an unguessable random id url to download the file + // So we use list to get the url of the file and then use it to download the file + let resp = self.list(&p, Some(1)).await?; + + // Use the mtach url to download the file + let url = resolve_blob(resp.blobs, p); + + if url.is_empty() { + return Err(Error::new(ErrorKind::NotFound, "Blob not found")); + } + + let mut req = Request::get(url); + + let range = args.range(); + if !range.is_full() { + req = req.header(http::header::RANGE, range.to_header()); + } + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn get_put_request( + &self, + path: &str, + size: Option, + args: &OpWrite, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://blob.vercel-storage.com/{}", + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0"); + + if let Some(size) = size { + req = req.header(header::CONTENT_LENGTH, size.to_string()) + } + + if let Some(mime) = args.content_type() { + req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime) + } + + let req = self.sign(req); + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn delete(&self, path: &str) -> Result<()> { + let p = build_abs_path(&self.root, path); + + let resp = self.list(&p, Some(1)).await?; + + let url = resolve_blob(resp.blobs, p); + + if url.is_empty() { + return Ok(()); + } + + let req = Request::post("https://blob.vercel-storage.com/delete"); + + let req = self.sign(req); + + let req_body = &json!({ + "urls": vec![url] + }); + + let req = req + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(Bytes::from(req_body.to_string()))) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(()), + _ => Err(parse_error(resp).await?), + } + } + + pub async fn head(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + + let resp = self.list(&p, Some(1)).await?; + + let url = resolve_blob(resp.blobs, p); + + if url.is_empty() { + return Err(Error::new(ErrorKind::NotFound, "Blob not found")); + } + + let req = Request::get(format!( + "https://blob.vercel-storage.com?url={}", + percent_encode_path(&url) + )); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn copy(&self, from: &str, to: &str) -> Result> { + let from = build_abs_path(&self.root, from); + + let resp = self.list(&from, Some(1)).await?; + + let from_url = resolve_blob(resp.blobs, from); + + if from_url.is_empty() { + return Err(Error::new(ErrorKind::NotFound, "Blob not found")); + } + + let to = build_abs_path(&self.root, to); + + let to_url = format!( + "https://blob.vercel-storage.com/{}?fromUrl={}", + percent_encode_path(&to), + percent_encode_path(&from_url), + ); + + let req = Request::put(&to_url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn list(&self, prefix: &str, limit: Option) -> Result { + let prefix = if prefix == "/" { "" } else { prefix }; + + let mut url = format!( + "https://blob.vercel-storage.com?prefix={}", + percent_encode_path(prefix) + ); + + if let Some(limit) = limit { + url.push_str(&format!("&limit={}", limit)) + } + + let req = Request::get(&url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + + let status = resp.status(); + + if status != StatusCode::OK { + return Err(parse_error(resp).await?); + } + + let body = resp.into_body().bytes().await?; + + let resp: ListResponse = + serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; + + Ok(resp) + } + + pub async fn initiate_multipart_upload( + &self, + path: &str, + args: &OpWrite, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://blob.vercel-storage.com/mpu/{}", + percent_encode_path(&p) + ); + + let req = Request::post(&url); + + let mut req = self.sign(req); + + req = req.header(X_VERCEL_BLOB_MPU_ACTION, "create"); + req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0"); + + if let Some(mime) = args.content_type() { + req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime); + }; + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn upload_part( + &self, + path: &str, + upload_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://blob.vercel-storage.com/mpu/{}", + percent_encode_path(&p) + ); + + let mut req = Request::post(&url); + + req = req.header(header::CONTENT_LENGTH, size); + req = req.header(X_VERCEL_BLOB_MPU_ACTION, "upload"); + req = req.header(X_VERCEL_BLOB_MPU_KEY, p); + req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id); + req = req.header(X_VERCEL_BLOB_MPU_PART_NUMBER, part_number); + + let req = self.sign(req); + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn complete_multipart_upload( + &self, + path: &str, + upload_id: &str, + parts: Vec, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://blob.vercel-storage.com/mpu/{}", + percent_encode_path(&p) + ); + + let mut req = Request::post(&url); + + req = req.header(X_VERCEL_BLOB_MPU_ACTION, "complete"); + req = req.header(X_VERCEL_BLOB_MPU_KEY, p); + req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id); + + let req = self.sign(req); + + let parts_json = json!(parts); + + let req = req + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(Bytes::from(parts_json.to_string()))) + .map_err(new_request_build_error)?; + + self.send(req).await + } +} + +pub fn parse_blob(blob: &Blob) -> Result { + let mode = if blob.pathname.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + let mut md = Metadata::new(mode); + if let Some(content_type) = blob.content_type.clone() { + md.set_content_type(&content_type); + } + md.set_content_length(blob.size); + md.set_last_modified(parse_datetime_from_rfc3339(&blob.uploaded_at)?); + md.set_content_disposition(&blob.content_disposition); + Ok(md) +} + +fn resolve_blob(blobs: Vec, path: String) -> String { + for blob in blobs { + if blob.pathname == path { + return blob.url; + } + } + "".to_string() +} + +#[derive(Default, Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListResponse { + pub cursor: Option, + pub has_more: bool, + pub blobs: Vec, +} + +#[derive(Default, Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Blob { + pub url: String, + pub pathname: String, + pub size: u64, + pub uploaded_at: String, + pub content_disposition: String, + pub content_type: Option, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Part { + pub part_number: usize, + pub etag: String, +} + +#[derive(Default, Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InitiateMultipartUploadResponse { + pub upload_id: String, + pub key: String, +} + +#[derive(Default, Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UploadPartResponse { + pub etag: String, +} diff --git a/core/src/services/vercel_blob/docs.md b/core/src/services/vercel_blob/docs.md new file mode 100644 index 00000000000..f640e891871 --- /dev/null +++ b/core/src/services/vercel_blob/docs.md @@ -0,0 +1,47 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] copy +- [ ] rename +- [x] list +- [x] scan +- [ ] presign +- [ ] blocking + +## Configuration + +- `root`: Set the work directory for backend +- `token`: VercelBlob token, environment var `BLOB_READ_WRITE_TOKEN` + +You can refer to [`VercelBlobBuilder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::VercelBlob; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + // create backend builder + let mut builder = VercelBlob::default(); + + // set the storage bucket for OpenDAL + builder.root("/"); + // set the token for OpenDAL + builder.token("you_token"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/vercel_blob/error.rs b/core/src/services/vercel_blob/error.rs new file mode 100644 index 00000000000..2d689983d5c --- /dev/null +++ b/core/src/services/vercel_blob/error.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use quick_xml::de; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// VercelBlobError is the error returned by VercelBlob service. +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +struct VercelBlobError { + error: VercelBlobErrorDetail, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +struct VercelBlobErrorDetail { + code: String, + message: Option, +} + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status.as_u16() { + 403 => (ErrorKind::PermissionDenied, false), + 404 => (ErrorKind::NotFound, false), + 500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, _vercel_blob_err) = de::from_reader::<_, VercelBlobError>(bs.clone().reader()) + .map(|vercel_blob_err| (format!("{vercel_blob_err:?}"), Some(vercel_blob_err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +#[cfg(test)] +mod test { + use futures::stream; + use http::StatusCode; + + use super::*; + + #[tokio::test] + async fn test_parse_error() { + let err_res = vec![( + r#"{ + "error": { + "code": "forbidden", + "message": "Invalid token" + } + }"#, + ErrorKind::PermissionDenied, + StatusCode::FORBIDDEN, + )]; + + for res in err_res { + let bs = bytes::Bytes::from(res.0); + let body = IncomingAsyncBody::new( + Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))), + None, + ); + let resp = Response::builder().status(res.2).body(body).unwrap(); + + let err = parse_error(resp).await; + + assert!(err.is_ok()); + assert_eq!(err.unwrap().kind(), res.1); + } + + let bs = bytes::Bytes::from( + r#"{ + "error": { + "code": "forbidden", + "message": "Invalid token" + } + }"#, + ); + + let out: VercelBlobError = serde_json::from_reader(bs.reader()).expect("must success"); + println!("{out:?}"); + + assert_eq!(out.error.code, "forbidden"); + assert_eq!(out.error.message, Some("Invalid token".to_string())); + } +} diff --git a/core/src/services/vercel_blob/lister.rs b/core/src/services/vercel_blob/lister.rs new file mode 100644 index 00000000000..e792d8f3791 --- /dev/null +++ b/core/src/services/vercel_blob/lister.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use super::core::parse_blob; +use super::core::VercelBlobCore; +use crate::raw::oio::Entry; +use crate::raw::*; +use crate::Result; + +pub struct VercelBlobLister { + core: Arc, + + path: String, + limit: Option, +} + +impl VercelBlobLister { + pub(super) fn new(core: Arc, path: &str, limit: Option) -> Self { + VercelBlobLister { + core, + path: path.to_string(), + limit, + } + } +} + +#[async_trait] +impl oio::PageList for VercelBlobLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let p = build_abs_path(&self.core.root, &self.path); + + let resp = self.core.list(&p, self.limit).await?; + + ctx.done = !resp.has_more; + + if let Some(cursor) = resp.cursor { + ctx.token = cursor; + } + + for blob in resp.blobs { + let path = build_rel_path(&self.core.root, &blob.pathname); + + if path == self.path { + continue; + } + + let md = parse_blob(&blob)?; + + ctx.entries.push_back(Entry::new(&path, md)); + } + + Ok(()) + } +} diff --git a/core/src/services/vercel_blob/mod.rs b/core/src/services/vercel_blob/mod.rs new file mode 100644 index 00000000000..2804d3fe735 --- /dev/null +++ b/core/src/services/vercel_blob/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::VercelBlobBuilder as VercelBlob; +pub use backend::VercelBlobConfig; + +mod core; +mod error; +mod lister; +mod writer; diff --git a/core/src/services/vercel_blob/writer.rs b/core/src/services/vercel_blob/writer.rs new file mode 100644 index 00000000000..46b8e32b8bd --- /dev/null +++ b/core/src/services/vercel_blob/writer.rs @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; + +use super::core::{InitiateMultipartUploadResponse, Part, UploadPartResponse, VercelBlobCore}; +use super::error::parse_error; +use crate::raw::*; +use crate::*; + +pub type VercelBlobWriters = oio::MultipartWriter; + +pub struct VercelBlobWriter { + core: Arc, + op: OpWrite, + path: String, +} + +impl VercelBlobWriter { + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + VercelBlobWriter { core, op, path } + } +} + +#[async_trait] +impl oio::MultipartWrite for VercelBlobWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { + let req = self + .core + .get_put_request(&self.path, Some(size), &self.op, body) + .await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn initiate_part(&self) -> Result { + let resp = self + .core + .initiate_multipart_upload(&self.path, &self.op) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let resp = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + + Ok(resp.upload_id) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write_part( + &self, + upload_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result { + let part_number = part_number + 1; + + let resp = self + .core + .upload_part(&self.path, upload_id, part_number, size, body) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let resp = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + + Ok(oio::MultipartPart { + part_number, + etag: resp.etag, + }) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { + let parts = parts + .iter() + .map(|p| Part { + part_number: p.part_number, + etag: p.etag.clone(), + }) + .collect::>(); + + let resp = self + .core + .complete_multipart_upload(&self.path, upload_id, parts) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn abort_part(&self, _upload_id: &str) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "VercelBlob does not support abort multipart upload", + )) + } +} diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index d5ac384c033..a5767f59e39 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -257,6 +257,8 @@ impl Operator { Scheme::Tikv => Self::from_map::(map)?.finish(), #[cfg(feature = "services-vercel-artifacts")] Scheme::VercelArtifacts => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-vercel-blob")] + Scheme::VercelBlob => Self::from_map::(map)?.finish(), #[cfg(feature = "services-webdav")] Scheme::Webdav => Self::from_map::(map)?.finish(), #[cfg(feature = "services-webhdfs")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 9276b1c3234..6620fa81454 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -44,6 +44,8 @@ pub enum Scheme { Seafile, /// [Upyun][crate::services::Upyun]: Upyun Services. Upyun, + /// [VercelBlob][crate::services::VercelBlob]: VercelBlob Services. + VercelBlob, /// [YandexDisk][crate::services::YandexDisk]: YandexDisk Services. YandexDisk, /// [Pcloud][crate::services::Pcloud]: Pcloud Services. @@ -279,6 +281,8 @@ impl Scheme { Scheme::Tikv, #[cfg(feature = "services-vercel-artifacts")] Scheme::VercelArtifacts, + #[cfg(feature = "services-vercel-blob")] + Scheme::VercelBlob, #[cfg(feature = "services-webdav")] Scheme::Webdav, #[cfg(feature = "services-webhdfs")] @@ -367,6 +371,7 @@ impl FromStr for Scheme { "swift" => Ok(Scheme::Swift), "oss" => Ok(Scheme::Oss), "vercel_artifacts" => Ok(Scheme::VercelArtifacts), + "vercel_blob" => Ok(Scheme::VercelBlob), "webdav" => Ok(Scheme::Webdav), "webhdfs" => Ok(Scheme::Webhdfs), "tikv" => Ok(Scheme::Tikv), @@ -427,6 +432,7 @@ impl From for &'static str { Scheme::Supabase => "supabase", Scheme::Swift => "swift", Scheme::VercelArtifacts => "vercel_artifacts", + Scheme::VercelBlob => "vercel_blob", Scheme::Oss => "oss", Scheme::Webdav => "webdav", Scheme::Webhdfs => "webhdfs", diff --git a/core/tests/behavior/async_write.rs b/core/tests/behavior/async_write.rs index fdef821b283..9b3124c456d 100644 --- a/core/tests/behavior/async_write.rs +++ b/core/tests/behavior/async_write.rs @@ -113,6 +113,11 @@ pub async fn test_write_with_special_chars(op: Operator) -> Result<()> { warn!("ignore test for atomicserver until https://github.com/atomicdata-dev/atomic-server/issues/663 is resolved"); return Ok(()); } + // Ignore test for vercel blob https://github.com/apache/opendal/pull/4103. + if op.info().scheme() == opendal::Scheme::VercelBlob { + warn!("ignore test for vercel blob https://github.com/apache/opendal/pull/4103"); + return Ok(()); + } let path = format!("{} !@#$%^&()_+-=;',.txt", uuid::Uuid::new_v4()); let (path, content, size) = TEST_FIXTURE.new_file_with_path(op.clone(), &path);