Skip to content

Commit

Permalink
[#6012] feat (gvfs-fuse): Support Gravitino S3 fileset filesystem ope…
Browse files Browse the repository at this point in the history
…ration in gvfs fuse (#6013)

### What changes were proposed in this pull request?

Support a Gravitino S3 fileset filesystem operation in gvfs fuse,
implemented by OpenDal

### Why are the changes needed?

Fix: #6012 

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually test

---------

Co-authored-by: Qiming Teng <[email protected]>
  • Loading branch information
diqiu50 and tengqm authored Jan 3, 2025
1 parent 8dcc7b8 commit ef2b102
Show file tree
Hide file tree
Showing 22 changed files with 1,202 additions and 231 deletions.
1 change: 1 addition & 0 deletions clients/filesystem-fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ futures-util = "0.3.30"
libc = "0.2.168"
log = "0.4.22"
once_cell = "1.20.2"
opendal = { version = "0.46.0", features = ["services-s3"] }
reqwest = { version = "0.12.9", features = ["json"] }
serde = { version = "1.0.216", features = ["derive"] }
tokio = { version = "1.38.0", features = ["full"] }
Expand Down
6 changes: 3 additions & 3 deletions clients/filesystem-fuse/conf/gvfs_fuse.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ block_size = 8192
uri = "http://localhost:8090"
metalake = "your_metalake"

# extent settings
# extend settings
[extend_config]
access_key = "your access_key"
secret_key = "your_secret_key"
s3-access_key_id = "your access_key"
s3-secret_access_key = "your_secret_key"
6 changes: 3 additions & 3 deletions clients/filesystem-fuse/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,18 +302,18 @@ mod test {

#[test]
fn test_config_from_file() {
let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_test.toml")).unwrap();
let config = AppConfig::from_file(Some("tests/conf/config_test.toml")).unwrap();
assert_eq!(config.fuse.file_mask, 0o644);
assert_eq!(config.fuse.dir_mask, 0o755);
assert_eq!(config.filesystem.block_size, 8192);
assert_eq!(config.gravitino.uri, "http://localhost:8090");
assert_eq!(config.gravitino.metalake, "test");
assert_eq!(
config.extend_config.get("access_key"),
config.extend_config.get("s3-access_key_id"),
Some(&"XXX_access_key".to_string())
);
assert_eq!(
config.extend_config.get("secret_key"),
config.extend_config.get("s3-secret_access_key"),
Some(&"XXX_secret_key".to_string())
);
}
Expand Down
98 changes: 77 additions & 21 deletions clients/filesystem-fuse/src/default_raw_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
*/
use crate::config::AppConfig;
use crate::filesystem::{
FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID,
ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, FS_META_FILE_ID,
FS_META_FILE_NAME, FS_META_FILE_PATH, INITIAL_FILE_ID, ROOT_DIR_FILE_ID,
ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
};
use crate::opened_file::{FileHandle, OpenFileFlags};
use crate::opened_file::{FileHandle, OpenFileFlags, OpenedFile};
use crate::opened_file_manager::OpenedFileManager;
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -78,6 +79,7 @@ impl<T: PathFileSystem> DefaultRawFileSystem<T> {
}

async fn resolve_file_id_to_filestat(&self, file_stat: &mut FileStat, parent_file_id: u64) {
debug_assert!(parent_file_id != 0);
let mut file_manager = self.file_entry_manager.write().await;
let file_entry = file_manager.get_file_entry_by_path(&file_stat.path);
match file_entry {
Expand Down Expand Up @@ -132,6 +134,21 @@ impl<T: PathFileSystem> DefaultRawFileSystem<T> {
let mut file_manager = self.file_entry_manager.write().await;
file_manager.insert(parent_file_id, file_id, path);
}

fn get_meta_file_stat(&self) -> FileStat {
let mut meta_file_stat =
FileStat::new_file_filestat_with_path(Path::new(FS_META_FILE_PATH), 0);
meta_file_stat.set_file_id(ROOT_DIR_FILE_ID, FS_META_FILE_ID);
meta_file_stat
}

fn is_meta_file(&self, file_id: u64) -> bool {
file_id == FS_META_FILE_ID
}

fn is_meta_file_name(&self, parent_file_id: u64, name: &OsStr) -> bool {
parent_file_id == ROOT_DIR_FILE_ID && name == OsStr::new(FS_META_FILE_NAME)
}
}

#[async_trait]
Expand All @@ -144,6 +161,13 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
Path::new(ROOT_DIR_PATH),
)
.await;

self.insert_file_entry_locked(
ROOT_DIR_FILE_ID,
FS_META_FILE_ID,
Path::new(FS_META_FILE_PATH),
)
.await;
self.fs.init().await
}

Expand All @@ -168,15 +192,22 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
}

async fn stat(&self, file_id: u64) -> Result<FileStat> {
if self.is_meta_file(file_id) {
return Ok(self.get_meta_file_stat());
}

let file_entry = self.get_file_entry(file_id).await?;
let mut file_stat = self.fs.stat(&file_entry.path).await?;
file_stat.set_file_id(file_entry.parent_file_id, file_entry.file_id);
Ok(file_stat)
}

async fn lookup(&self, parent_file_id: u64, name: &OsStr) -> Result<FileStat> {
let parent_file_entry = self.get_file_entry(parent_file_id).await?;
if self.is_meta_file_name(parent_file_id, name) {
return Ok(self.get_meta_file_stat());
}

let parent_file_entry = self.get_file_entry(parent_file_id).await?;
let path = parent_file_entry.path.join(name);
let mut file_stat = self.fs.stat(&path).await?;
// fill the file id to file stat
Expand All @@ -192,10 +223,21 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
for file_stat in child_filestats.iter_mut() {
self.resolve_file_id_to_filestat(file_stat, file_id).await;
}

if file_id == ROOT_DIR_FILE_ID {
child_filestats.push(self.get_meta_file_stat());
}
Ok(child_filestats)
}

async fn open_file(&self, file_id: u64, flags: u32) -> Result<FileHandle> {
if self.is_meta_file(file_id) {
let meta_file = OpenedFile::new(self.get_meta_file_stat());
let resutl = self.opened_file_manager.put(meta_file);
let file = resutl.lock().await;
return Ok(file.file_handle());
}

self.open_file_internal(file_id, flags, FileType::RegularFile)
.await
}
Expand All @@ -211,6 +253,10 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
name: &OsStr,
flags: u32,
) -> Result<FileHandle> {
if self.is_meta_file_name(parent_file_id, name) {
return Err(Errno::from(libc::EEXIST));
}

let parent_file_entry = self.get_file_entry(parent_file_id).await?;
let mut file_without_id = self
.fs
Expand Down Expand Up @@ -247,11 +293,19 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
}

async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> {
if self.is_meta_file(file_id) {
return Ok(());
}

let file_entry = self.get_file_entry(file_id).await?;
self.fs.set_attr(&file_entry.path, file_stat, true).await
}

async fn remove_file(&self, parent_file_id: u64, name: &OsStr) -> Result<()> {
if self.is_meta_file_name(parent_file_id, name) {
return Err(Errno::from(libc::EPERM));
}

let parent_file_entry = self.get_file_entry(parent_file_id).await?;
let path = parent_file_entry.path.join(name);
self.fs.remove_file(&path).await?;
Expand All @@ -271,6 +325,15 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
Ok(())
}

async fn flush_file(&self, _file_id: u64, fh: u64) -> Result<()> {
let opened_file = self
.opened_file_manager
.get(fh)
.ok_or(Errno::from(libc::EBADF))?;
let mut file = opened_file.lock().await;
file.flush().await
}

async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> {
let opened_file = self
.opened_file_manager
Expand All @@ -280,7 +343,11 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
file.close().await
}

async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) -> Result<Bytes> {
async fn read(&self, file_id: u64, fh: u64, offset: u64, size: u32) -> Result<Bytes> {
if self.is_meta_file(file_id) {
return Ok(Bytes::new());
}

let (data, file_stat) = {
let opened_file = self
.opened_file_manager
Expand All @@ -297,7 +364,11 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
data
}

async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result<u32> {
async fn write(&self, file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result<u32> {
if self.is_meta_file(file_id) {
return Err(Errno::from(libc::EPERM));
}

let (len, file_stat) = {
let opened_file = self
.opened_file_manager
Expand Down Expand Up @@ -368,8 +439,6 @@ impl FileEntryManager {
#[cfg(test)]
mod tests {
use super::*;
use crate::filesystem::tests::TestRawFileSystem;
use crate::memory_filesystem::MemoryFileSystem;

#[test]
fn test_file_entry_manager() {
Expand All @@ -389,17 +458,4 @@ mod tests {
assert!(manager.get_file_entry_by_id(2).is_none());
assert!(manager.get_file_entry_by_path(Path::new("a/b")).is_none());
}

#[tokio::test]
async fn test_default_raw_file_system() {
let memory_fs = MemoryFileSystem::new().await;
let raw_fs = DefaultRawFileSystem::new(
memory_fs,
&AppConfig::default(),
&FileSystemContext::default(),
);
let _ = raw_fs.init().await;
let mut tester = TestRawFileSystem::new(raw_fs);
tester.test_raw_file_system().await;
}
}
2 changes: 2 additions & 0 deletions clients/filesystem-fuse/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum ErrorCode {
GravitinoClientError,
InvalidConfig,
ConfigNotFound,
OpenDalError,
}

impl ErrorCode {
Expand All @@ -39,6 +40,7 @@ impl std::fmt::Display for ErrorCode {
ErrorCode::GravitinoClientError => write!(f, "Gravitino client error"),
ErrorCode::InvalidConfig => write!(f, "Invalid config"),
ErrorCode::ConfigNotFound => write!(f, "Config not found"),
ErrorCode::OpenDalError => write!(f, "OpenDal error"),
}
}
}
Expand Down
Loading

0 comments on commit ef2b102

Please sign in to comment.