Skip to content

refact(async-vfs): move to tokio #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
798 changes: 157 additions & 641 deletions Cargo.lock

Large diffs are not rendered by default.

37 changes: 26 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,39 @@ travis-ci = { repository = "manuel-woelker/rust-vfs", branch = "master" }

[dependencies]
rust-embed = { version = "8.0.0", optional = true }
async-std = { version = "1.12.0", optional = true }
async-trait = { version = "0.1.73", optional = true}
tokio = { version = "1.29.0", features = ["macros", "rt"], optional = true}
futures = {version = "0.3.28", optional = true}
async-recursion = {version = "1.0.5", optional = true}
filetime = "0.2.23"
camino = { version = "1.0.5", optional = true }
async-trait = { version = "0.1.73", optional = true }
tokio = { version = "1.38", features = [
"macros",
"rt",
"sync",
"fs",
"io-util",
], optional = true } # MSRV 1.65
tokio-stream = { version = "0.1.2", features = ["fs"], optional = true }

futures = { version = "0.3.28", optional = true }
async-recursion = { version = "1.1", optional = true }
filetime = "0.2.25"
camino = { version = "1.1.9", optional = true }

[dev-dependencies]
uuid = { version = "=0.8.1", features = ["v4"] }
camino = "1.0.5"
anyhow = "1.0.58"
tokio-test = "0.4.3"
anyhow = "1.0"
tokio-test = "0.4"

[features]
default = []

embedded-fs = ["rust-embed"]
async-vfs = ["tokio", "async-std", "async-trait", "futures", "async-recursion"]
export-test-macros = [ "camino" ]
async-vfs = [
"dep:tokio",
"dep:tokio-stream",
"dep:async-trait",
"dep:futures",
"dep:async-recursion",
]
export-test-macros = ["camino"]

[package.metadata.docs.rs]
features = ["embedded-fs", "async-vfs"]
8 changes: 4 additions & 4 deletions src/async_vfs/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use crate::async_vfs::{AsyncVfsPath, SeekAndRead};
use crate::error::VfsErrorKind;
use crate::{VfsError, VfsMetadata, VfsResult};

use async_std::io::Write;
use async_std::stream::Stream;
use async_trait::async_trait;
use futures::stream::Stream;
use std::fmt::Debug;
use std::time::SystemTime;
use tokio::io::AsyncWrite;

/// File system implementations must implement this trait
/// All path parameters are absolute, starting with '/', except for the root directory
Expand All @@ -32,9 +32,9 @@ pub trait AsyncFileSystem: Debug + Sync + Send + 'static {
/// Opens the file at this path for reading
async fn open_file(&self, path: &str) -> VfsResult<Box<dyn SeekAndRead + Send + Unpin>>;
/// Creates a file at this path for writing
async fn create_file(&self, path: &str) -> VfsResult<Box<dyn Write + Send + Unpin>>;
async fn create_file(&self, path: &str) -> VfsResult<Box<dyn AsyncWrite + Send + Unpin>>;
/// Opens the file at this path for appending
async fn append_file(&self, path: &str) -> VfsResult<Box<dyn Write + Send + Unpin>>;
async fn append_file(&self, path: &str) -> VfsResult<Box<dyn AsyncWrite + Send + Unpin>>;
/// Returns the file metadata for the file at this path
async fn metadata(&self, path: &str) -> VfsResult<VfsMetadata>;
/// Sets the files creation timestamp, if the implementation supports it
Expand Down
8 changes: 4 additions & 4 deletions src/async_vfs/impls/altroot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::async_vfs::{AsyncFileSystem, AsyncVfsPath, SeekAndRead};
use crate::{error::VfsErrorKind, VfsMetadata, VfsResult};
use std::time::SystemTime;

use async_std::io::Write;
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use tokio::io::AsyncWrite;

/// Similar to a chroot but done purely by path manipulation
///
Expand Down Expand Up @@ -60,11 +60,11 @@ impl AsyncFileSystem for AsyncAltrootFS {
self.path(path)?.open_file().await
}

async fn create_file(&self, path: &str) -> VfsResult<Box<dyn Write + Send + Unpin>> {
async fn create_file(&self, path: &str) -> VfsResult<Box<dyn AsyncWrite + Send + Unpin>> {
self.path(path)?.create_file().await
}

async fn append_file(&self, path: &str) -> VfsResult<Box<dyn Write + Send + Unpin>> {
async fn append_file(&self, path: &str) -> VfsResult<Box<dyn AsyncWrite + Send + Unpin>> {
self.path(path)?.append_file().await
}

Expand Down Expand Up @@ -135,7 +135,7 @@ mod tests_physical {
use super::*;
use crate::async_vfs::AsyncPhysicalFS;

use async_std::io::ReadExt;
use tokio::io::AsyncReadExt;

test_async_vfs!(futures::executor::block_on(async {
let temp_dir = std::env::temp_dir();
Expand Down
85 changes: 48 additions & 37 deletions src/async_vfs/impls/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ use crate::error::VfsErrorKind;
use crate::path::VfsFileType;
use crate::{VfsMetadata, VfsResult};

use async_std::io::{prelude::SeekExt, Cursor, Read, Seek, SeekFrom, Write};
use async_std::sync::{Arc, RwLock};
use async_trait::async_trait;
use futures::task::{Context, Poll};
use futures::{Stream, StreamExt};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::io::Cursor;
use std::mem::swap;
use std::pin::Pin;
use std::sync::Arc;

use tokio::io::AsyncSeek;
use tokio::io::AsyncSeekExt;
use tokio::io::ReadBuf;
use tokio::io::{AsyncRead, AsyncWrite, SeekFrom};
use tokio::sync::RwLock;

type AsyncMemoryFsHandle = Arc<RwLock<AsyncMemoryFsImpl>>;

Expand Down Expand Up @@ -60,12 +66,12 @@ struct AsyncWritableFile {
fs: AsyncMemoryFsHandle,
}

impl Write for AsyncWritableFile {
impl AsyncWrite for AsyncWritableFile {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, async_std::io::Error>> {
) -> Poll<Result<usize, tokio::io::Error>> {
let this = self.get_mut();
let file = Pin::new(&mut this.content);
file.poll_write(cx, buf)
Expand All @@ -74,18 +80,18 @@ impl Write for AsyncWritableFile {
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), async_std::io::Error>> {
) -> Poll<Result<(), tokio::io::Error>> {
let this = self.get_mut();
let file = Pin::new(&mut this.content);
file.poll_flush(cx)
}
fn poll_close(
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), async_std::io::Error>> {
) -> Poll<Result<(), tokio::io::Error>> {
let this = self.get_mut();
let file = Pin::new(&mut this.content);
file.poll_close(cx)
file.poll_shutdown(cx)
}
}

Expand All @@ -106,7 +112,7 @@ impl Drop for AsyncWritableFile {
struct AsyncReadableFile {
#[allow(clippy::rc_buffer)] // to allow accessing the same object as writable
content: Arc<Vec<u8>>,
// Position of the read cursor in the "file"
/// Position of the read cursor in the "file"
cursor_pos: u64,
}

Expand All @@ -116,48 +122,52 @@ impl AsyncReadableFile {
}
}

impl Read for AsyncReadableFile {
impl AsyncRead for AsyncReadableFile {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, async_std::io::Error>> {
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), tokio::io::Error>> {
let this = self.get_mut();
let bytes_left = this.len() - this.cursor_pos;
let bytes_read = std::cmp::min(buf.len() as u64, bytes_left);

if bytes_left == 0 {
return Poll::Ready(Ok(0));
return Poll::Ready(Ok(()));
}
buf[..bytes_read as usize].copy_from_slice(

let bytes_read = std::cmp::min(buf.capacity() as u64, bytes_left);
buf.put_slice(
&this.content[this.cursor_pos as usize..(this.cursor_pos + bytes_read) as usize],
);
this.cursor_pos += bytes_read;
Poll::Ready(Ok(bytes_read as usize))

Poll::Ready(Ok(()))
}
}

impl Seek for AsyncReadableFile {
fn poll_seek(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64, async_std::io::Error>> {
impl AsyncSeek for AsyncReadableFile {
fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> Result<(), tokio::io::Error> {
let this = self.get_mut();
let new_pos = match pos {
SeekFrom::Start(offset) => offset as i64,
SeekFrom::End(offset) => this.cursor_pos as i64 - offset,
SeekFrom::Current(offset) => this.cursor_pos as i64 + offset,
};
if new_pos < 0 || new_pos >= this.len() as i64 {
Poll::Ready(Err(async_std::io::Error::new(
async_std::io::ErrorKind::InvalidData,
Err(tokio::io::Error::new(
tokio::io::ErrorKind::InvalidData,
"Requested offset is outside the file!",
)))
))
} else {
this.cursor_pos = new_pos as u64;
Poll::Ready(Ok(new_pos as u64))
Ok(())
}
}

fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
let this = self.get_mut();
Poll::Ready(Ok(this.cursor_pos))
}
}

#[async_trait]
Expand Down Expand Up @@ -226,7 +236,7 @@ impl AsyncFileSystem for AsyncMemoryFS {
}))
}

async fn create_file(&self, path: &str) -> VfsResult<Box<dyn Write + Send + Unpin>> {
async fn create_file(&self, path: &str) -> VfsResult<Box<dyn AsyncWrite + Send + Unpin>> {
self.ensure_has_parent(path).await?;
let content = Arc::new(Vec::<u8>::new());
self.handle.write().await.files.insert(
Expand All @@ -244,7 +254,7 @@ impl AsyncFileSystem for AsyncMemoryFS {
Ok(Box::new(writer))
}

async fn append_file(&self, path: &str) -> VfsResult<Box<dyn Write + Send + Unpin>> {
async fn append_file(&self, path: &str) -> VfsResult<Box<dyn AsyncWrite + Send + Unpin>> {
let handle = self.handle.write().await;
let file = handle.files.get(path).ok_or(VfsErrorKind::FileNotFound)?;
let mut content = Cursor::new(file.content.as_ref().clone());
Expand Down Expand Up @@ -327,28 +337,29 @@ struct AsyncMemoryFile {
mod tests {
use super::*;
use crate::async_vfs::AsyncVfsPath;
use async_std::io::{ReadExt, WriteExt};

use tokio::io::{AsyncReadExt, AsyncWriteExt};
test_async_vfs!(AsyncMemoryFS::new());

#[tokio::test]
async fn write_and_read_file() -> VfsResult<()> {
let root = AsyncVfsPath::new(AsyncMemoryFS::new());
let path = root.join("foobar.txt").unwrap();
let path = root.join("foobar.txt")?;
let _send = &path as &dyn Send;
{
let mut file = path.create_file().await.unwrap();
write!(file, "Hello world").await.unwrap();
write!(file, "!").await.unwrap();
let mut file = path.create_file().await?;
file.write_all(b"Hello world").await?;
file.write_all(b"!").await?;
}
{
let mut file = path.open_file().await.unwrap();
let mut file = path.open_file().await?;
let mut string: String = String::new();
file.read_to_string(&mut string).await.unwrap();
file.read_to_string(&mut string).await?;
assert_eq!(string, "Hello world!");
}
assert!(path.exists().await?);
assert!(!root.join("foo").unwrap().exists().await?);
let metadata = path.metadata().await.unwrap();
assert!(!root.join("foo")?.exists().await?);
let metadata = path.metadata().await?;
assert_eq!(metadata.len, 12);
assert_eq!(metadata.file_type, VfsFileType::File);
Ok(())
Expand Down
12 changes: 6 additions & 6 deletions src/async_vfs/impls/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use crate::async_vfs::{AsyncFileSystem, AsyncVfsPath, SeekAndRead};
use crate::error::VfsErrorKind;
use crate::{VfsMetadata, VfsResult};

use async_std::io::Write;
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use std::collections::HashSet;
use std::time::SystemTime;
use tokio::io::AsyncWrite;

/// An overlay file system combining several filesystems into one, an upper layer with read/write access and lower layers with only read access
///
Expand Down Expand Up @@ -132,7 +132,7 @@ impl AsyncFileSystem for AsyncOverlayFS {
self.read_path(path).await?.open_file().await
}

async fn create_file(&self, path: &str) -> VfsResult<Box<dyn Write + Send + Unpin>> {
async fn create_file(&self, path: &str) -> VfsResult<Box<dyn AsyncWrite + Send + Unpin>> {
self.ensure_has_parent(path).await?;
let result = self.write_path(path)?.create_file().await?;
let whiteout_path = self.whiteout_path(path)?;
Expand All @@ -142,7 +142,7 @@ impl AsyncFileSystem for AsyncOverlayFS {
Ok(result)
}

async fn append_file(&self, path: &str) -> VfsResult<Box<dyn Write + Send + Unpin>> {
async fn append_file(&self, path: &str) -> VfsResult<Box<dyn AsyncWrite + Send + Unpin>> {
let write_path = self.write_path(path)?;
if !write_path.exists().await? {
self.ensure_has_parent(path).await?;
Expand Down Expand Up @@ -214,8 +214,8 @@ mod tests {
use super::*;
use crate::async_vfs::AsyncMemoryFS;

use async_std::io::WriteExt;
use futures::stream::StreamExt;
use tokio::io::AsyncWriteExt;

test_async_vfs!({
let upper_root: AsyncVfsPath = AsyncMemoryFS::new().into();
Expand Down Expand Up @@ -436,9 +436,9 @@ mod tests_physical {
let temp_dir = std::env::temp_dir();
let dir = temp_dir.join(uuid::Uuid::new_v4().to_string());
let lower_path = dir.join("lower");
async_std::fs::create_dir_all(&lower_path).await.unwrap();
tokio::fs::create_dir_all(&lower_path).await.unwrap();
let upper_path = dir.join("upper");
async_std::fs::create_dir_all(&upper_path).await.unwrap();
tokio::fs::create_dir_all(&upper_path).await.unwrap();

let upper_root: AsyncVfsPath = AsyncPhysicalFS::new(upper_path).into();
let lower_root: AsyncVfsPath = AsyncPhysicalFS::new(lower_path).into();
Expand Down
Loading
Loading