Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add zstd support #9

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ keywords = ["utilities"]
[dependencies]
thiserror = "^1"

# For auto-gzip handing of files
# For auto-gzip handling of files
flate2 = "^1"

# For auto-zstd handling of files
zstd = "0.12.4"

# For auto-serialization of structs to csv/tsv
csv = "^1"
serde = { version = "^1.0.123", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.58.1"
channel = "1.71.1"
components = ["rustfmt", "clippy"]
52 changes: 47 additions & 5 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! I/O activities, such a slurping a file by lines, or writing a collection of `Serializable`
//! objects to a path.
//!
//! The two core parts of this module are teh [`Io`] and [`DelimFile`] structs. These structs provide
//! The two core parts of this module are the [`Io`] and [`DelimFile`] structs. These structs provide
//! methods for reading and writing to files that transparently handle compression based on the
//! file extension of the path given to the methods.
//!
Expand Down Expand Up @@ -51,10 +51,14 @@ use flate2::bufread::MultiGzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use serde::{de::DeserializeOwned, Serialize};
use zstd::stream::{Decoder as ZstdDecoder, Encoder as ZstdEncoder};

/// The set of file extensions to treat as GZIPPED
const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"];

/// The set of file extensions to treat as ZSTD compressed
const ZSTD_EXTENSIONS: [&str; 1] = ["zst"];

/// The default buffer size when creating buffered readers/writers
const BUFFER_SIZE: usize = 64 * 1024;

Expand Down Expand Up @@ -90,8 +94,19 @@ impl Io {
}
}

/// Opens a file for reading. Transparently handles reading gzipped files based
/// extension.
/// Returns true if the path ends with a recognized ZSTD file extension
fn is_zstd_path<P: AsRef<Path>>(p: &P) -> bool {
if let Some(ext) = p.as_ref().extension() {
match ext.to_str() {
Some(x) => ZSTD_EXTENSIONS.contains(&x),
None => false,
}
} else {
false
}
}

/// Opens a file for reading. Transparently handles decoding gzip and zstd files.
pub fn new_reader<P>(&self, p: &P) -> Result<Box<dyn BufRead + Send>>
where
P: AsRef<Path>,
Expand All @@ -101,20 +116,23 @@ impl Io {

if Self::is_gzip_path(p) {
Ok(Box::new(BufReader::with_capacity(self.buffer_size, MultiGzDecoder::new(buf))))
} else if Self::is_zstd_path(p) {
Ok(Box::new(BufReader::with_capacity(self.buffer_size, ZstdDecoder::new(buf).unwrap())))
kockan marked this conversation as resolved.
Show resolved Hide resolved
} else {
Ok(Box::new(buf))
}
}

/// Opens a file for writing. Transparently handles writing GZIP'd data if the file
/// ends with a recognized GZIP extension.
/// Opens a file for writing. Transparently handles encoding data in gzip and zstd formats.
pub fn new_writer<P>(&self, p: &P) -> Result<BufWriter<Box<dyn Write + Send>>>
where
P: AsRef<Path>,
{
let file = File::create(p).map_err(FgError::IoError)?;
let write: Box<dyn Write + Send> = if Io::is_gzip_path(p) {
Box::new(GzEncoder::new(file, self.compression))
} else if Io::is_zstd_path(p) {
Box::new(ZstdEncoder::new(file, 0).unwrap().auto_finish())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about the unwrap().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know enough about zstd compression, but I'm not sure just specifying 0 here (which appears to then use the default, level 3, compression) is the right thing to do, as it gives the user zero control.

On the other hand, re-using the compression passed in above is problematic as gzip and zstd have very different compression level ranges (0-9 vs. -17 to 22), and introducing a new parameter to Io::new() for zstd_level would be a breaking change.

Thoughts @nh13?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like:

Screenshot 2023-08-18 at 12 26 25 PM

Not sure if it's the proper way to go about it but it's one possibility if it's preferred that the Io struct absolutely doesn't change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see having a member per level, but then what about bzip2 and future compression levels? And perhaps we have compression that requires not just a level? And certainly we don't want to add translation layer that goes from 0-10 levels for each compression type.

What about instead of storing compression as a member of Io, we have a Map/Vec that stores a compression-specific struct (can impl a trait). You could even make is_gzip_path specific to the compression as a trait-level method (is_path_for), so a new compression need only impl the base trait to be added. Then we avoid the if/else if/else chain there and it can just be a loop (with a default if it exits the loop)? Does that make sense?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. The Io struct probably will have to change at some point if further compression schemes are to be supported as far as I can see. #10 might also be a good solution to this. Haven't checked it out thoroughly yet, but it does feel like good idea to actually check the file format instead of just the extension, although might still want to have a way to compare the found format vs. extension to detect any inconsistencies.

} else {
Box::new(file)
};
Expand Down Expand Up @@ -313,6 +331,30 @@ mod tests {
assert_ne!(text.metadata().unwrap().len(), gzipped.metadata().unwrap().len());
}

#[test]
fn test_reading_and_writing_zstd_files() {
let lines = vec!["foo", "bar", "baz"];
let tempdir = TempDir::new().unwrap();
let text = tempdir.path().join("text.txt");
let zstd_compressed = tempdir.path().join("zstd_compressed.txt.zst");
kockan marked this conversation as resolved.
Show resolved Hide resolved

assert_eq!(Io::is_zstd_path(&text), false);
assert_eq!(Io::is_zstd_path(&zstd_compressed), true);

let io = Io::default();
io.write_lines(&text, &mut lines.iter()).unwrap();
io.write_lines(&zstd_compressed, &mut lines.iter()).unwrap();

let r1 = io.read_lines(&text).unwrap();
let r2 = io.read_lines(&zstd_compressed).unwrap();

assert_eq!(r1, lines);
assert_eq!(r2, lines);

// Check whether the two files are different
assert_ne!(text.metadata().unwrap().len(), zstd_compressed.metadata().unwrap().len());
kockan marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn test_reading_and_writing_empty_delim_file() {
let recs: Vec<Rec> = vec![];
Expand Down