Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use opendal to replace object_store #978

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ with-db = ["dep:sea-orm", "dep:sea-orm-migration", "loco-gen/with-db"]
channels = ["dep:socketioxide"]
# Storage features
all_storage = ["storage_aws_s3", "storage_azure", "storage_gcp"]
storage_aws_s3 = ["object_store/aws"]
storage_azure = ["object_store/azure"]
storage_gcp = ["object_store/gcp"]
storage_aws_s3 = ["opendal/services-s3"]
storage_azure = ["opendal/services-azblob"]
storage_gcp = ["opendal/services-gcs"]
# Cache feature
cache_inmem = ["dep:moka"]
bg_redis = ["dep:rusty-sidekiq", "dep:bb8"]
Expand Down Expand Up @@ -125,7 +125,7 @@ socketioxide = { version = "0.14.0", features = ["state"], optional = true }


# File Upload
object_store = { version = "0.11.0", default-features = false }
opendal = { version = "0.50.2", default-features = false,features = ["services-memory","services-fs"] }

# cache
moka = { version = "0.12.7", features = ["sync"], optional = true }
Expand Down
4 changes: 2 additions & 2 deletions src/bgworker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ impl Queue {

/// # Errors
///
/// Does not currently return an error, but the postgres or other future queue implementations
/// might, so using Result here as return type.
/// Does not currently return an error, but the postgres or other future
/// queue implementations might, so using Result here as return type.
pub fn shutdown(&self) -> Result<()> {
println!("waiting for running jobs to finish...");
match self {
Expand Down
12 changes: 8 additions & 4 deletions src/bgworker/sqlt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ async fn dequeue(client: &SqlitePool) -> Result<Option<Task>> {

if let Some(task) = row {
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP WHERE id = $1",
"UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP \
WHERE id = $1",
)
.bind(&task.id)
.execute(&mut *tx)
Expand Down Expand Up @@ -325,15 +326,17 @@ async fn complete_task(
if let Some(interval_ms) = interval_ms {
let next_run_at = Utc::now() + chrono::Duration::milliseconds(interval_ms);
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at = DATETIME($1) WHERE id = $2",
"UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at \
= DATETIME($1) WHERE id = $2",
)
.bind(next_run_at)
.bind(task_id)
.execute(pool)
.await?;
} else {
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP WHERE id = $1",
"UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP \
WHERE id = $1",
)
.bind(task_id)
.execute(pool)
Expand All @@ -347,7 +350,8 @@ async fn fail_task(pool: &SqlitePool, task_id: &TaskId, error: &crate::Error) ->
error!(err = msg, "failed task");
let error_json = serde_json::json!({ "error": msg });
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data = json_patch(task_data, $1) WHERE id = $2",
"UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data \
= json_patch(task_data, $1) WHERE id = $2",
)
.bind(error_json)
.bind(task_id)
Expand Down
3 changes: 1 addition & 2 deletions src/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::path::PathBuf;
use axum::Router;
#[cfg(feature = "with-db")]
use sea_orm_migration::MigratorTrait;
use tokio::task::JoinHandle;
use tokio::{select, signal};
use tokio::{select, signal, task::JoinHandle};
use tracing::{debug, error, info, warn};

#[cfg(feature = "with-db")]
Expand Down
3 changes: 2 additions & 1 deletion src/mailer/email_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ impl EmailSender {
///
/// # Errors
///
/// When email doesn't send successfully or has an error to build the message
/// When email doesn't send successfully or has an error to build the
/// message
pub async fn mail(&self, email: &Email) -> Result<()> {
let content = MultiPart::alternative_plain_html(email.text.clone(), email.html.clone());
let mut builder = Message::builder()
Expand Down
59 changes: 18 additions & 41 deletions src/storage/drivers/aws.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
#[cfg(test)]
use core::time::Duration;
use std::sync::Arc;

use object_store::{
aws::{AmazonS3Builder, AwsCredential},
StaticCredentialProvider,
};
#[cfg(test)]
use object_store::{BackoffConfig, RetryConfig};
use opendal::{services::S3, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::{opendal_adapter::OpendalAdapter, StoreDriver};
use crate::storage::StorageResult;

/// A set of AWS security credentials
#[derive(Debug)]
Expand All @@ -34,14 +25,10 @@ pub struct Credential {
/// # Errors
///
/// When could not initialize the client instance
pub fn new(bucket_name: &str, region: &str) -> Result<Box<dyn StoreDriver>> {
let s3 = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_region(region)
.build()
.map_err(Box::from)?;
pub fn new(bucket_name: &str, region: &str) -> StorageResult<Box<dyn StoreDriver>> {
let s3 = S3::default().bucket(bucket_name).region(region);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3))))
Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish())))
}

/// Create new AWS s3 storage with bucket, region and credentials.
Expand All @@ -64,18 +51,16 @@ pub fn with_credentials(
bucket_name: &str,
region: &str,
credentials: Credential,
) -> Result<Box<dyn StoreDriver>> {
let s3 = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_region(region)
.with_credentials(Arc::new(StaticCredentialProvider::new(AwsCredential {
key_id: credentials.key_id.to_string(),
secret_key: credentials.secret_key.to_string(),
token: credentials.token,
})))
.build()
.map_err(Box::from)?;
Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3))))
) -> StorageResult<Box<dyn StoreDriver>> {
let mut s3 = S3::default()
.bucket(bucket_name)
.region(region)
.access_key_id(&credentials.key_id)
.secret_access_key(&credentials.secret_key);
if let Some(token) = credentials.token {
s3 = s3.session_token(&token);
}
Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish())))
}

/// Build store with failure
Expand All @@ -86,15 +71,7 @@ pub fn with_credentials(
#[cfg(test)]
#[must_use]
pub fn with_failure() -> Box<dyn StoreDriver> {
let s3 = AmazonS3Builder::new()
.with_bucket_name("loco-test")
.with_retry(RetryConfig {
backoff: BackoffConfig::default(),
max_retries: 0,
retry_timeout: Duration::from_secs(0),
})
.build()
.unwrap();
let s3 = S3::default().bucket("loco-test");

Box::new(ObjectStoreAdapter::new(Box::new(s3)))
Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish()))
}
22 changes: 11 additions & 11 deletions src/storage/drivers/azure.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use object_store::azure::MicrosoftAzureBuilder;
use opendal::{services::Azblob, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new Azure storage.
///
Expand All @@ -18,13 +18,13 @@ pub fn new(
container_name: &str,
account_name: &str,
access_key: &str,
) -> Result<Box<dyn StoreDriver>> {
let azure = MicrosoftAzureBuilder::new()
.with_container_name(container_name)
.with_account(account_name)
.with_access_key(access_key)
.build()
.map_err(Box::from)?;
) -> StorageResult<Box<dyn StoreDriver>> {
let azure = Azblob::default()
.container(container_name)
.account_name(account_name)
.account_key(access_key);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(azure))))
Ok(Box::new(OpendalAdapter::new(
Operator::new(azure)?.finish(),
)))
}
25 changes: 9 additions & 16 deletions src/storage/drivers/gcp.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
use object_store::gcp::GoogleCloudStorageBuilder;
use opendal::{services::Gcs, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new GCP storage.
///
/// # Examples
///```
/// use loco_rs::storage::drivers::gcp;
/// let gcp_driver = gcp::new("key", "account_key", "service_account");
/// let gcp_driver = gcp::new("key", "credential_path");
/// ```
///
/// # Errors
///
/// When could not initialize the client instance
pub fn new(
bucket_name: &str,
service_account_key: &str,
service_account: &str,
) -> Result<Box<dyn StoreDriver>> {
let gcs = GoogleCloudStorageBuilder::new()
.with_bucket_name(bucket_name)
.with_service_account_key(service_account_key)
.with_service_account_path(service_account)
.build()
.map_err(Box::from)?;
pub fn new(bucket_name: &str, credential_path: &str) -> StorageResult<Box<dyn StoreDriver>> {
let gcs = Gcs::default()
.bucket(bucket_name)
.credential_path(credential_path);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(gcs))))
Ok(Box::new(OpendalAdapter::new(Operator::new(gcs)?.finish())))
}
24 changes: 16 additions & 8 deletions src/storage/drivers/local.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use object_store::local::LocalFileSystem;
use opendal::{services::Fs, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new filesystem storage with no prefix
///
Expand All @@ -10,9 +10,18 @@ use crate::Result;
/// use loco_rs::storage::drivers::local;
/// let file_system_driver = local::new();
/// ```
///
/// # Panics
///
/// Panics if the filesystem service built failed.
#[must_use]
pub fn new() -> Box<dyn StoreDriver> {
Box::new(ObjectStoreAdapter::new(Box::new(LocalFileSystem::new())))
let fs = Fs::default();
Box::new(OpendalAdapter::new(
Operator::new(fs)
.expect("fs service should build with success")
.finish(),
))
}

/// Create new filesystem storage with `prefix` applied to all paths
Expand All @@ -26,8 +35,7 @@ pub fn new() -> Box<dyn StoreDriver> {
/// # Errors
///
/// Returns an error if the path does not exist
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Box<dyn StoreDriver>> {
Ok(Box::new(ObjectStoreAdapter::new(Box::new(
LocalFileSystem::new_with_prefix(prefix).map_err(Box::from)?,
))))
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> StorageResult<Box<dyn StoreDriver>> {
let fs = Fs::default().root(&prefix.as_ref().display().to_string());
Ok(Box::new(OpendalAdapter::new(Operator::new(fs)?.finish())))
}
15 changes: 12 additions & 3 deletions src/storage/drivers/mem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use object_store::memory::InMemory;
use opendal::{services::Memory, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use super::StoreDriver;
use crate::storage::drivers::opendal_adapter::OpendalAdapter;

/// Create new in-memory storage.
///
Expand All @@ -9,7 +10,15 @@ use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
/// use loco_rs::storage::drivers::mem;
/// let mem_storage = mem::new();
/// ```
///
/// # Panics
///
/// Panics if the memory service built failed.
#[must_use]
pub fn new() -> Box<dyn StoreDriver> {
Box::new(ObjectStoreAdapter::new(Box::new(InMemory::new())))
Box::new(OpendalAdapter::new(
Operator::new(Memory::default())
.expect("memory service must build with success")
.finish(),
))
}
29 changes: 25 additions & 4 deletions src/storage/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::path::Path;

use async_trait::async_trait;
use bytes::Bytes;
use opendal::Reader;

#[cfg(feature = "storage_aws_s3")]
pub mod aws;
#[cfg(feature = "storage_azure")]
Expand All @@ -11,7 +13,7 @@ pub mod gcp;
pub mod local;
pub mod mem;
pub mod null;
pub mod object_store_adapter;
pub mod opendal_adapter;

use super::StorageResult;

Expand All @@ -21,9 +23,28 @@ pub struct UploadResponse {
pub version: Option<String>,
}

// TODO: need to properly abstract the object_store type in order to not
// strongly depend on it
pub type GetResponse = object_store::GetResult;
/// TODO: Add more methods to `GetResponse` to read the content in different
/// ways
///
/// For example, we can read a specific range of bytes from the stream.
pub struct GetResponse {
stream: Reader,
}

impl GetResponse {
pub(crate) fn new(stream: Reader) -> Self {
Self { stream }
}

/// Read all content from the stream and return as `Bytes`.
///
/// # Errors
///
/// Returns a `StorageError` with the reason for the failure.
pub async fn bytes(&self) -> StorageResult<Bytes> {
Ok(self.stream.read(..).await?.to_bytes())
}
}

#[async_trait]
pub trait StoreDriver: Sync + Send {
Expand Down
Loading