From 713cd16da50259c6625073e577dfd2fc0a3c7df1 Mon Sep 17 00:00:00 2001 From: Can Kockan Date: Mon, 21 Aug 2023 16:24:50 -0400 Subject: [PATCH] Initial code for sort-fastq --- Cargo.lock | 80 ++++++++++++++++++++++++++++ Cargo.toml | 1 + src/bin/commands/mod.rs | 1 + src/bin/commands/sort_fastq.rs | 96 ++++++++++++++++++++++++++++++++++ src/bin/main.rs | 2 + 5 files changed, 180 insertions(+) create mode 100644 src/bin/commands/sort_fastq.rs diff --git a/Cargo.lock b/Cargo.lock index db9bdea..816191b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -411,6 +411,19 @@ dependencies = [ "libc", ] +[[package]] +name = "ext-sort" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf73e44617eab501beba39234441a194cf138629d3b6447f81f573e1c3d0a13" +dependencies = [ + "log", + "rayon", + "rmp-serde", + "serde", + "tempfile", +] + [[package]] name = "fastrand" version = "1.8.0" @@ -467,6 +480,7 @@ dependencies = [ "csv", "enum_dispatch", "env_logger", + "ext-sort", "fgoxide", "flate2", "itertools", @@ -615,6 +629,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" + [[package]] name = "humantime" version = "2.1.0" @@ -851,6 +871,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.2", + "libc", +] + [[package]] name = "once_cell" version = "1.16.0" @@ -886,6 +916,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "pin-project" version = "1.0.12" @@ -983,6 +1019,28 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + [[package]] name = "read-structure" version = "0.1.0" @@ -1036,6 +1094,28 @@ dependencies = [ "winapi", ] +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rstest" version = "0.15.0" diff --git a/Cargo.toml b/Cargo.toml index 7c6ba55..d0f828f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ bstr = "1.0.1" clap = { version = "4.0.25", features = ["derive"] } enum_dispatch = "0.3.8" env_logger = "0.9.3" +ext-sort = "0.1.4" fgoxide = "0.3.0" flate2 = { version = "1.0.25", features = ["zlib-ng"] } # Force the faster backend that requires a C compiler itertools = "0.10.5" diff --git a/src/bin/commands/mod.rs b/src/bin/commands/mod.rs index e4b8baa..e4818ea 100644 --- a/src/bin/commands/mod.rs +++ b/src/bin/commands/mod.rs @@ -1,2 +1,3 @@ pub mod command; pub mod demux; +pub mod sort_fastq; diff --git a/src/bin/commands/sort_fastq.rs b/src/bin/commands/sort_fastq.rs new file mode 100644 index 0000000..e8fb240 --- /dev/null +++ b/src/bin/commands/sort_fastq.rs @@ -0,0 +1,96 @@ +use crate::commands::command::Command; +use anyhow::Result; +use clap::Parser; +use ext_sort::{ExternalSorter, ExternalSorterBuilder, LimitedBufferBuilder}; +use seq_io::fastq::Reader as FastqReader; +use seq_io::fastq::{OwnedRecord, Record}; +use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; +use std::fs::File; +use std::io::BufWriter; +use std::path::{Path, PathBuf}; + +#[derive(Deserialize, Serialize)] +struct SortableFastqRecord(OwnedRecord); + +impl PartialEq for SortableFastqRecord { + fn eq(&self, other: &Self) -> bool { + self.0.id().unwrap() == other.0.id().unwrap() + } +} + +impl Eq for SortableFastqRecord {} + +impl PartialOrd for SortableFastqRecord { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(&other)) + } +} + +impl Ord for SortableFastqRecord { + fn cmp(&self, other: &Self) -> Ordering { + self.0.id().unwrap().cmp(&other.0.id().unwrap()) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// SortFastq (main class) and it's impls +//////////////////////////////////////////////////////////////////////////////// + +/// Sorts the input FASTQ file by lexicographic ordering of its read names. +/// +/// ## Example Command Line +/// +/// ``` +/// fqtk sort-fastq \ +/// --input test.fq \ +/// --output test.sorted.fq \ +/// --max-records 1000000 +/// ``` +/// +#[derive(Parser, Debug)] +#[command(version)] +pub(crate) struct SortFastq { + /// Input fastq file + #[clap(long, short = 'i', required = true)] + input: PathBuf, + + /// Output fastq path + #[clap(long, short = 'o', required = true)] + output: PathBuf, + + /// Maximum number of records to be kept in buffer + #[clap(long, short = 'm', default_value = "1000000")] + max_records: usize, +} + +impl Command for SortFastq { + /// Executes the sort_fastq command + fn execute(&self) -> Result<()> { + let mut fq_reader = FastqReader::from_path(&self.input)?; + let mut fq_writer = BufWriter::new(File::create(&self.output)?); + + let sorter: ExternalSorter< + SortableFastqRecord, + seq_io::fastq::Error, + LimitedBufferBuilder, + > = ExternalSorterBuilder::new() + .with_tmp_dir(Path::new("./")) + .with_buffer(LimitedBufferBuilder::new(self.max_records, true)) + .build() + .unwrap(); + + let sorted = sorter + .sort( + fq_reader.records().map(|record| record.map(|record| SortableFastqRecord(record))), + ) + .unwrap(); + + for item in sorted.map(Result::unwrap) { + let _ = + seq_io::fastq::write_to(&mut fq_writer, &item.0.head, &item.0.seq, &item.0.qual); + } + + Ok(()) + } +} diff --git a/src/bin/main.rs b/src/bin/main.rs index 3b6dd4e..ee627dd 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -6,6 +6,7 @@ use anyhow::Result; use clap::Parser; use commands::command::Command; use commands::demux::Demux; +use commands::sort_fastq::SortFastq; use enum_dispatch::enum_dispatch; use env_logger::Env; @@ -23,6 +24,7 @@ struct Args { #[command(version)] enum Subcommand { Demux(Demux), + SortFastq(SortFastq), } fn main() -> Result<()> {