Skip to content

Commit

Permalink
refactor(integrations/cloudfilter): implement Filter instead of SyncF…
Browse files Browse the repository at this point in the history
…ilter (#4920)

refactor: implement Filter instead of SyncFilter
  • Loading branch information
ho-229 authored Jul 25, 2024
1 parent c7cecc4 commit f17b508
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 58 deletions.
14 changes: 10 additions & 4 deletions integrations/cloudfilter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@ version = "0.0.0"
[dependencies]
anyhow = "1.0.86"
opendal = { version = "0.47.0", path = "../../core" }
widestring = "1.1.0"
cloud-filter = "0.0.2"
cloud-filter = "0.0.3"
futures = "0.3.30"
bincode = "1.3.3"
serde = { version = "1.0.203", features = ["derive"] }
log = "0.4.17"

[dev-dependencies]
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio = { version = "1.38.0", features = [
"macros",
"rt-multi-thread",
"signal",
] }
env_logger = "0.11.2"
opendal = { version = "0.47.0", path = "../../core", features = ["services-fs"] }
opendal = { version = "0.47.0", path = "../../core", features = [
"services-fs",
] }
39 changes: 19 additions & 20 deletions integrations/cloudfilter/examples/readonly.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use std::{
env,
path::{Path, PathBuf},
};
use std::{env, path::PathBuf};

use cloud_filter::root::{
HydrationType, PopulationType, Registration, SecurityId, Session, SyncRootIdBuilder,
HydrationType, PopulationType, SecurityId, Session, SyncRootIdBuilder, SyncRootInfo,
};
use opendal::{services, Operator};
use tokio::signal;
use widestring::{u16str, U16String};
use tokio::{runtime::Handle, signal};

const PROVIDER_NAME: &str = "ro-cloudfilter";
const DISPLAY_NAME: &str = "Read Only Cloud Filter";
Expand All @@ -24,30 +20,33 @@ async fn main() {

let op = Operator::new(fs).expect("build operator").finish();

let sync_root_id = SyncRootIdBuilder::new(U16String::from_str(PROVIDER_NAME))
let sync_root_id = SyncRootIdBuilder::new(PROVIDER_NAME)
.user_security_id(SecurityId::current_user().unwrap())
.build();

if !sync_root_id.is_registered().unwrap() {
let u16_display_name = U16String::from_str(DISPLAY_NAME);
Registration::from_sync_root_id(&sync_root_id)
.display_name(&u16_display_name)
.hydration_type(HydrationType::Full)
.population_type(PopulationType::Full)
.icon(
U16String::from_str("%SystemRoot%\\system32\\charmap.exe"),
0,
sync_root_id
.register(
SyncRootInfo::default()
.with_display_name(DISPLAY_NAME)
.with_hydration_type(HydrationType::Full)
.with_population_type(PopulationType::Full)
.with_icon("%SystemRoot%\\system32\\charmap.exe,0")
.with_version("1.0.0")
.with_recycle_bin_uri("http://cloudmirror.example.com/recyclebin")
.unwrap()
.with_path(&client_path)
.unwrap(),
)
.version(u16str!("1.0.0"))
.recycle_bin_uri(u16str!("http://cloudmirror.example.com/recyclebin"))
.register(Path::new(&client_path))
.unwrap();
}

let handle = Handle::current();
let connection = Session::new()
.connect(
.connect_async(
&client_path,
cloudfilter_opendal::CloudFilter::new(op, PathBuf::from(&client_path)),
move |f| handle.clone().block_on(f),
)
.expect("create session");

Expand Down
63 changes: 29 additions & 34 deletions integrations/cloudfilter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,39 @@
mod file;

use std::{
cmp::min,
fs::{self, File},
io::{Read, Seek, SeekFrom},
path::{Path, PathBuf},
};

use cloud_filter::{
error::{CResult, CloudErrorKind},
filter::{info, ticket, SyncFilter},
filter::{info, ticket, Filter},
metadata::Metadata,
placeholder::{ConvertOptions, Placeholder},
placeholder_file::PlaceholderFile,
request::Request,
utility::{FileTime, WriteAt},
};
use file::FileBlob;
use opendal::{BlockingOperator, Entry, Metakey, Operator};
use futures::StreamExt;
use opendal::{Entry, Metakey, Operator};

const BUF_SIZE: usize = 65536;

pub struct CloudFilter {
op: BlockingOperator,
op: Operator,
root: PathBuf,
}

impl CloudFilter {
pub fn new(op: Operator, root: PathBuf) -> Self {
Self {
op: op.blocking(),
root,
}
Self { op, root }
}
}

impl SyncFilter for CloudFilter {
fn fetch_data(
impl Filter for CloudFilter {
async fn fetch_data(
&self,
request: Request,
ticket: ticket::FetchData,
Expand All @@ -71,38 +69,33 @@ impl SyncFilter for CloudFilter {
.strip_prefix(&self.root)
.map_err(|_| CloudErrorKind::NotUnderSyncRoot)?;

let mut reader = self
let reader = self
.op
.reader_with(&remote_path.to_string_lossy().replace('\\', "/"))
.call()
.await
.map_err(|e| {
log::warn!("failed to open file: {}", e);
CloudErrorKind::Unsuccessful
})?
.into_std_read(range.clone())
.map_err(|e| {
log::warn!("failed to read file: {}", e);
CloudErrorKind::Unsuccessful
})?;

let mut position = range.start;
let mut buffer = [0u8; BUF_SIZE];
let mut buffer = Vec::with_capacity(BUF_SIZE);

loop {
let mut bytes_read = reader.read(&mut buffer).map_err(|e| {
log::warn!("failed to read file: {}", e);
CloudErrorKind::Unsuccessful
})?;
let mut bytes_read = reader
.read_into(
&mut buffer,
position..min(range.end, position + BUF_SIZE as u64),
)
.await
.map_err(|e| {
log::warn!("failed to read file: {}", e);
CloudErrorKind::Unsuccessful
})?;

let unaligned = bytes_read % 4096;
if unaligned != 0 && position + (bytes_read as u64) < range.end {
bytes_read -= unaligned;
reader
.seek(SeekFrom::Current(-(unaligned as i64)))
.map_err(|e| {
log::warn!("failed to seek file: {}", e);
CloudErrorKind::Unsuccessful
})?;
}

ticket
Expand All @@ -117,6 +110,8 @@ impl SyncFilter for CloudFilter {
break;
}

buffer.clear();

ticket.report_progress(range.end, position).map_err(|e| {
log::warn!("failed to report progress: {}", e);
CloudErrorKind::Unsuccessful
Expand All @@ -126,7 +121,7 @@ impl SyncFilter for CloudFilter {
Ok(())
}

fn fetch_placeholders(
async fn fetch_placeholders(
&self,
request: Request,
ticket: ticket::FetchPlaceholders,
Expand All @@ -146,12 +141,12 @@ impl SyncFilter for CloudFilter {
.op
.lister_with(&remote_path.to_string_lossy().replace('\\', "/"))
.metakey(Metakey::LastModified | Metakey::ContentLength)
.call()
.await
.map_err(|e| {
log::warn!("failed to list files: {}", e);
CloudErrorKind::Unsuccessful
})?
.filter_map(|e| {
.filter_map(|e| async {
let entry = e.ok()?;
let metadata = entry.metadata();
let entry_remote_path = PathBuf::from(entry.path());
Expand Down Expand Up @@ -183,7 +178,8 @@ impl SyncFilter for CloudFilter {
)
})
})
.collect::<Vec<_>>();
.collect::<Vec<_>>()
.await;

_ = ticket.pass_with_placeholder(&mut entries).map_err(|e| {
log::warn!("failed to pass placeholder: {e:?}");
Expand All @@ -193,12 +189,11 @@ impl SyncFilter for CloudFilter {
}
}

/// Checks if the entry is in sync.
/// Checks if the entry is in sync, then convert to placeholder.
///
/// Returns `true` if the entry is not exists, `false` otherwise.
fn check_in_sync(entry: &Entry, root: &Path) -> bool {
let absolute = root.join(entry.path());
println!("absolute: {}", absolute.display());

let Ok(metadata) = fs::metadata(&absolute) else {
return true;
Expand Down

0 comments on commit f17b508

Please sign in to comment.