Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add zstd support #9

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ keywords = ["utilities"]
[dependencies]
thiserror = "^1"

# For auto-gzip handing of files
# For auto-gzip handling of files
flate2 = "^1"

# For auto-zstd handling of files
zstd = "0.12.4"

# For auto-serialization of structs to csv/tsv
csv = "^1"
serde = { version = "^1.0.123", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.58.1"
channel = "1.71.1"
components = ["rustfmt", "clippy"]
49 changes: 44 additions & 5 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! I/O activities, such a slurping a file by lines, or writing a collection of `Serializable`
//! objects to a path.
//!
//! The two core parts of this module are teh [`Io`] and [`DelimFile`] structs. These structs provide
//! The two core parts of this module are the [`Io`] and [`DelimFile`] structs. These structs provide
//! methods for reading and writing to files that transparently handle compression based on the
//! file extension of the path given to the methods.
//!
Expand Down Expand Up @@ -51,10 +51,14 @@ use flate2::bufread::MultiGzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use serde::{de::DeserializeOwned, Serialize};
use zstd::stream::{Decoder, Encoder};
kockan marked this conversation as resolved.
Show resolved Hide resolved

/// The set of file extensions to treat as GZIPPED
const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"];

/// The set of file extensions to treat as ZSTD compressed
const ZSTD_EXTENSIONS: [&str; 1] = ["zst"];

/// The default buffer size when creating buffered readers/writers
const BUFFER_SIZE: usize = 64 * 1024;

Expand Down Expand Up @@ -90,8 +94,19 @@ impl Io {
}
}

/// Opens a file for reading. Transparently handles reading gzipped files based
/// extension.
/// Returns true if the path ends with a recognized ZSTD file extension
fn is_zstd_path<P: AsRef<Path>>(p: &P) -> bool {
if let Some(ext) = p.as_ref().extension() {
match ext.to_str() {
Some(x) => ZSTD_EXTENSIONS.contains(&x),
None => false,
}
} else {
false
}
}

/// Opens a file for reading. Transparently handles decoding gzip and zstd files.
pub fn new_reader<P>(&self, p: &P) -> Result<Box<dyn BufRead + Send>>
where
P: AsRef<Path>,
Expand All @@ -101,20 +116,23 @@ impl Io {

if Self::is_gzip_path(p) {
Ok(Box::new(BufReader::with_capacity(self.buffer_size, MultiGzDecoder::new(buf))))
} else if Self::is_zstd_path(p) {
Ok(Box::new(BufReader::with_capacity(self.buffer_size, Decoder::new(buf).unwrap())))
} else {
Ok(Box::new(buf))
}
}

/// Opens a file for writing. Transparently handles writing GZIP'd data if the file
/// ends with a recognized GZIP extension.
/// Opens a file for writing. Transparently handles encoding data in gzip and zstd formats.
pub fn new_writer<P>(&self, p: &P) -> Result<BufWriter<Box<dyn Write + Send>>>
where
P: AsRef<Path>,
{
let file = File::create(p).map_err(FgError::IoError)?;
let write: Box<dyn Write + Send> = if Io::is_gzip_path(p) {
Box::new(GzEncoder::new(file, self.compression))
} else if Io::is_zstd_path(p) {
Box::new(Encoder::new(file, 0).unwrap().auto_finish())
} else {
Box::new(file)
};
Expand Down Expand Up @@ -313,6 +331,27 @@ mod tests {
assert_ne!(text.metadata().unwrap().len(), gzipped.metadata().unwrap().len());
}

#[test]
fn test_reading_and_writing_zstd_files() {
let lines = vec!["foo", "bar", "baz"];
let tempdir = TempDir::new().unwrap();
let text = tempdir.path().join("text.txt");
let zstd_compressed = tempdir.path().join("zstd_compressed.txt.zst");
kockan marked this conversation as resolved.
Show resolved Hide resolved

let io = Io::default();
io.write_lines(&text, &mut lines.iter()).unwrap();
io.write_lines(&zstd_compressed, &mut lines.iter()).unwrap();

let r1 = io.read_lines(&text).unwrap();
let r2 = io.read_lines(&zstd_compressed).unwrap();

assert_eq!(r1, lines);
assert_eq!(r2, lines);

// Also check that we actually wrote zstd encoded data to the zstd file!
assert_ne!(text.metadata().unwrap().len(), zstd_compressed.metadata().unwrap().len());
kockan marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn test_reading_and_writing_empty_delim_file() {
let recs: Vec<Rec> = vec![];
Expand Down