Skip to content

Commit

Permalink
Merge pull request #114 from oscar-project/uj/osc-75-group-split-and-…
Browse files Browse the repository at this point in the history
…compress-steps

Add splitting and compression at ungoliant runtime
  • Loading branch information
Uinelj authored Aug 8, 2023
2 parents 876e4f2 + 3bfba79 commit d772eb1
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 43 deletions.
26 changes: 16 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[package]
name = "ungoliant"
version = "2.0.0"
authors = ["Julien Abadji <[email protected]>, Pedro J. Ortiz <[email protected]>"]
authors = [
"Julien Abadji <[email protected]>, Pedro J. Ortiz <[email protected]>",
]
edition = "2021"
description = "The pipeline for the OSCAR corpus."
license = "Apache-2.0"
Expand All @@ -11,8 +13,12 @@ repository = "https://github.com/oscar-project/ungoliant"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
reqwest = { version = "0.11", default-features=false, features = ["rustls-tls", "blocking", "stream"] }
flate2 = { version = "1.0.20"}
reqwest = { version = "0.11", default-features = false, features = [
"rustls-tls",
"blocking",
"stream",
] }
flate2 = { version = "1.0.20" }
futures-core = "0.3"
futures-util = "0.3"
futures = "0.3"
Expand All @@ -21,8 +27,8 @@ env_logger = "0.8.3"
log = "0.4.14"
itertools = "0.10.0"
tokio = { version = "1", features = ["full"] }
tokio-util = {version="0.6.6", features=["compat"]}
warc = {version="0.3.0", features=["with_serde"]}
tokio-util = { version = "0.6.6", features = ["compat"] }
warc = { version = "0.3.0", features = ["with_serde"] }
ut1_blocklist = "0.3.0"
fasttext = "0.7.6"
bytes = "1"
Expand All @@ -42,14 +48,13 @@ unicode-script = "0.5.4"
unicode-segmentation = "1.8.0"
csv = "1.1.6"
unic-ucd = "0.9.0"
oxilangtag = {version="0.1.3", features=["serde"]}
oxilangtag = { version = "0.1.3", features = ["serde"] }
language-tags = "0.3.2"
lazy_static = "1.4.0"
oscar-io = "0.2.2"
#tlsh = {git="https://github.com/Uinelj/tlsh-rs", branch="fix-q3-panic"}
oscar-io = "0.4.0"
tlsh-fixed = "0.1.1"

ctclib-pp = {version="0.2.0", optional=true}
ctclib-pp = { version = "0.2.0", optional = true }


[features]
Expand All @@ -60,8 +65,9 @@ rand_distr = "0.4.2"
sha-1 = "0.9"
criterion = "0.3"
serial_test = "0.5.1"
tempfile="3.2.0"
tempfile = "3.2.0"
test-log = "0.2.11"
zstd = "0.12.4"

[[bench]]
name = "fasttext_bench"
Expand Down
9 changes: 9 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,13 @@ pub struct Pipeline {
help = "Optional path to kenlm folder. for the language xx, you have to have a xx.binary file."
)]
pub kenlms_path: Option<PathBuf>,

#[structopt(
help = "Split size (in MBytes). Default: No splitting",
long = "split_size"
)]
pub split: Option<u64>,

#[structopt(short = "c", long = "comp", help = "Enables zstd compression")]
pub comp: bool,
}
102 changes: 78 additions & 24 deletions src/io/langfiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Each language (provided by [crate::lang::LANG]) is given a [self::Writer] wrappe
## Warning
When using compression, ensue that you **drop** [LangFilesDoc] before trying to read written data. This is because [zstd] finishes things up at reader drop.
!*/
use std::{
collections::HashMap,
Expand All @@ -28,6 +30,7 @@ use oscar_io::v3::{Writer, WriterTrait};
type LanguageMap = HashMap<LanguageTag<String>, Arc<Mutex<Writer>>>;
pub struct LangFilesDoc {
writers: Arc<RwLock<LanguageMap>>,
comp: bool,
dst: PathBuf,
part_size_bytes: Option<u64>,
}
Expand Down Expand Up @@ -76,20 +79,27 @@ impl LangFilesDoc {
/// Also keep in mind that [Self::close_meta] has to be called once every write is done.
///
// [Self::close_meta] could be integrated in an `impl Drop`
pub fn new(dst: &Path, part_size_bytes: Option<u64>) -> Self {
pub fn new(dst: &Path, part_size_bytes: Option<u64>, comp: bool) -> Self {
Self {
writers: Arc::new(RwLock::new(HashMap::new())),
dst: dst.to_path_buf(),
part_size_bytes,
comp,
}
}

fn new_writer(
dst: &Path,
lang: LanguageTag<String>,
part_size_bytes: Option<u64>,
comp: bool,
) -> Result<Arc<Mutex<Writer>>, Error> {
let w = Writer::new(dst, lang, part_size_bytes)?;
let comp = if comp {
Some(oscar_io::v3::Comp::Zstd { level: 0 })
} else {
None
};
let w = Writer::new(dst, lang, part_size_bytes, comp)?;

Ok(Arc::new(Mutex::new(w)))
}
Expand All @@ -115,6 +125,7 @@ impl LangFilesDoc {
&self.dst,
k.clone(),
self.part_size_bytes,
self.comp,
)?);

info!("{k}: Done");
Expand All @@ -127,12 +138,22 @@ impl LangFilesDoc {
) -> std::sync::RwLockReadGuard<HashMap<LanguageTag<String>, Arc<Mutex<Writer>>>> {
self.writers.read().unwrap()
}

/// Flushes all writers.
pub fn flush_all(&self) -> Result<(), Error> {
for writer in self.writers.read().unwrap().values() {
let mut lock = writer.try_lock().unwrap();
lock.flush()?;

Check warning on line 146 in src/io/langfiles.rs

View check run for this annotation

Codecov / codecov/patch

src/io/langfiles.rs#L143-L146

Added lines #L143 - L146 were not covered by tests
}

Ok(())

Check warning on line 149 in src/io/langfiles.rs

View check run for this annotation

Codecov / codecov/patch

src/io/langfiles.rs#L149

Added line #L149 was not covered by tests
}
}

#[cfg(test)]
mod tests {

use std::{fs::File, path::PathBuf};
use std::{fs::File, io::Read, path::PathBuf};

use crate::pipelines::oscardoc::types::{Document, Metadata};
use warc::{BufferedBody, Record, WarcHeader};
Expand All @@ -143,16 +164,32 @@ mod tests {

type WarcHeaders = HashMap<WarcHeader, Vec<u8>>;

fn get_docs() -> Vec<Document> {
let content = "Hello!".to_string();

let record = Record::default();
let record: Record<BufferedBody> = record.add_body(content.clone());

let record_id = Identification::new(LanguageTag::parse("en".to_string()).unwrap(), 1.0);
let sentences_id = vec![Some(record_id.clone())];

let metadata = Metadata::new(&record_id, &sentences_id);
let (headers, _) = record.into_raw_parts();

let docs = vec![Document::new(content, headers.headers, metadata)];
docs
}

#[test]
fn init_doc() {
let dst = tempdir().unwrap();
let _: LangFilesDoc = LangFilesDoc::new(dst.path(), None);
let _: LangFilesDoc = LangFilesDoc::new(dst.path(), None, false);
}

#[test]
fn test_contains() {
let dst = tempdir().unwrap();
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None);
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None, false);
let language = LanguageTag::parse("fr".to_string()).unwrap();

assert!(!lf.contains(&language));
Expand All @@ -165,24 +202,9 @@ mod tests {
#[test]
fn write_one_doc() {
let dst = tempdir().unwrap();
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None);

let content = "Hello!".to_string();

let record = Record::default();
let record: Record<BufferedBody> = record.add_body(content);
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None, false);

let record_id = Identification::new(LanguageTag::parse("en".to_string()).unwrap(), 1.0);
let sentences_id = vec![Some(record_id.clone())];

let metadata = Metadata::new(&record_id, &sentences_id);
let (headers, content) = record.into_raw_parts();

let docs = vec![Document::new(
String::from_utf8_lossy(&content).to_string(),
headers.headers,
metadata,
)];
let docs = get_docs();

lf.insert_writer(docs[0].identification().label().clone())
.unwrap();
Expand All @@ -194,14 +216,46 @@ mod tests {

if let Ok(mut w) = w.try_lock() {
w.write(docs.to_vec()).unwrap();
w.flush().unwrap();
}

let mut read_path = PathBuf::from(dst.path());
read_path.push("en_meta.jsonl");
read_path.push("en.jsonl");

let b = File::open(read_path).unwrap();
let doc_from_file: Document = serde_json::from_reader(b).unwrap();

assert_eq!(doc_from_file, docs[0]);
}

#[test]
fn write_one_doc_comp() {
let dst = tempdir().unwrap();
let docs = get_docs();

{
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None, true);

lf.insert_writer(docs[0].identification().label().clone())
.unwrap();
let w = lf
.writers()
.get(docs[0].identification().label())
.unwrap()
.clone();

if let Ok(mut w) = w.try_lock() {
w.write(docs.to_vec()).unwrap();
w.flush().unwrap();
};
}
// lf.flush_all().unwrap();
let mut read_path = PathBuf::from(dst.path());
read_path.push("en.jsonl.zstd");

let b = File::open(&read_path).unwrap();
let dec = zstd::decode_all(b).unwrap();
let doc_from_file: Document = serde_json::from_slice(&dec).unwrap();

assert_eq!(doc_from_file, docs[0]);
}
}
13 changes: 11 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ async fn main() -> Result<(), error::Error> {

cli::Ungoliant::Pipeline(p) => {
let mut schema_filepath = p.dst.clone();
let p =
pipelines::OscarDocNew::new(p.src, p.dst, p.lid_path, p.blocklist, p.kenlms_path);

// todo: oscardocnew implements from?
let p = pipelines::OscarDocNew::new(
p.src,
p.dst,
p.lid_path,
p.blocklist,
p.kenlms_path,
p.split.map(|size_mbytes| size_mbytes * 1_000_000),
p.comp,
);
p.run()?;

schema_filepath.push("metadata_schema.json");
Expand Down
12 changes: 11 additions & 1 deletion src/pipelines/oscardoc/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub struct OscarDoc {
lid_path: PathBuf,
blocklist: Option<PathBuf>,
kenlms_path: Option<PathBuf>,
split: Option<u64>, // in bytes
comp: bool,
}

impl OscarDoc {
Expand All @@ -67,6 +69,8 @@ impl OscarDoc {
lid_path: PathBuf,
blocklist: Option<PathBuf>,
kenlms_path: Option<PathBuf>,
split: Option<u64>,
comp: bool,
) -> Self {
if blocklist.is_none() {
warn!("No blocklist folder specified! No adult content tagging will be done.");
Expand All @@ -79,6 +83,8 @@ impl OscarDoc {
lid_path,
blocklist,
kenlms_path,
split,
comp,
}
}

Expand Down Expand Up @@ -448,7 +454,7 @@ impl Pipeline<()> for OscarDoc {
// ourselves.
let results = results.enumerate().par_bridge();

let langfiles = LangFilesDoc::new(&self.dst, None);
let langfiles = LangFilesDoc::new(&self.dst, self.split, self.comp);

Check warning on line 457 in src/pipelines/oscardoc/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

src/pipelines/oscardoc/pipeline.rs#L457

Added line #L457 was not covered by tests
#[cfg(feature = "kenlm")]
let kenlms = if let Some(kenlms_path) = &self.kenlms_path {
if !kenlms_path.is_dir() {
Expand Down Expand Up @@ -510,6 +516,10 @@ impl Pipeline<()> for OscarDoc {
}
});

// flush writers
info!("Flushing writers");
langfiles.flush_all()?;
info!("Done");

Check warning on line 522 in src/pipelines/oscardoc/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

src/pipelines/oscardoc/pipeline.rs#L520-L522

Added lines #L520 - L522 were not covered by tests
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/processing/rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl<'a> Rebuilder<'a> {
}

// create mutex
let wr = Arc::new(Mutex::new(Writer::new(self.dst, self.lang, None)?));
let wr = Arc::new(Mutex::new(Writer::new(self.dst, self.lang, None, None)?));

Check warning on line 280 in src/processing/rebuild.rs

View check run for this annotation

Codecov / codecov/patch

src/processing/rebuild.rs#L280

Added line #L280 was not covered by tests

// iterate over shard results
let errors: Vec<Result<(), Error>> = sr
Expand Down
8 changes: 3 additions & 5 deletions tests/oscardoc_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn gen_corpus() {
let kenlm = Path::new("res/kenlm/").to_path_buf();

//TODO test with custom blocklists
let pipeline = OscarDoc::new(src, dst, lid, Some(bl), Some(kenlm));
let pipeline = OscarDoc::new(src, dst, lid, Some(bl), Some(kenlm), None, false);
pipeline.run().expect(
"Ensure to have shards in res/shards, lid.176.bin at root and blocklist at res/blocklist",
);
Expand Down Expand Up @@ -52,14 +52,12 @@ fn check_rebuild() {
rb.run().unwrap();

// open source corpus, store documents and order them by record id
let f = File::open(&src_corpus).unwrap();
let doc_reader_source = oscar_io::oscar_doc::Reader::new(BufReader::new(f));
let doc_reader_source = oscar_io::v3::Reader::from_path(&src_corpus).unwrap();
let mut docs_source = doc_reader_source.map(|x| x.unwrap()).collect::<Vec<_>>();
docs_source.sort_unstable_by(|a, b| get_record_id(a).cmp(&get_record_id(b)));
// open rebuilt corpus
dst.push("fr_meta.jsonl");
let f = File::open(&dst).unwrap();
let doc_reader_rebuild = oscar_io::oscar_doc::Reader::new(BufReader::new(f));
let doc_reader_rebuild = oscar_io::v3::Reader::from_path(&dst).unwrap();
let mut docs_rebuild = doc_reader_rebuild.map(|x| x.unwrap()).collect::<Vec<_>>();
docs_rebuild.sort_unstable_by(|a, b| get_record_id(a).cmp(&get_record_id(b)));

Expand Down

0 comments on commit d772eb1

Please sign in to comment.