Skip to content

Commit

Permalink
Add option to preserve row order in streaming + parallel mode
Browse files Browse the repository at this point in the history
  • Loading branch information
fxpineau committed Apr 15, 2024
1 parent 3ef240b commit 67e9388
Showing 1 changed file with 142 additions and 50 deletions.
192 changes: 142 additions & 50 deletions crates/cli/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

use clap::Args;
use crossbeam::channel::bounded;
use crossbeam::channel::{bounded, Receiver, Sender};
use serde::{de::DeserializeSeed, Deserializer};

use votable::{
Expand Down Expand Up @@ -54,7 +54,6 @@ impl FromStr for OutputFormat {
}

/// Convert a single table XML VOTable in streaming mode.
/// Tags after `</TABLE>` are preserved.
#[derive(Debug, Args)]
pub struct StreamConvert {
/// Path of the input XML VOTable [default: read from stdin]
Expand All @@ -72,6 +71,9 @@ pub struct StreamConvert {
/// Exec concurrently using N threads (row order not preserved!)
#[arg(long, value_name = "N")]
parallel: Option<usize>,
/// In parallel mode, keep the input table row order
#[arg(long, value_name = "keep_row_order")]
keep_row_order: bool,
/// Number of rows process by a same thread in `parallel` mode
#[arg(long, default_value_t = 10_000_usize)]
chunk_size: usize,
Expand Down Expand Up @@ -124,18 +126,20 @@ impl StreamConvert {
W: Write,
{
match it.data_type() {
// In binary64, 6 bit -> 1 ASCII Char
// 3 bytes = 24 bits -> 4 ASCII Chars
TableOrBinOrBin2::TableData => {
match self.output_fmt {
OutputFormat::XmlTabledata => to_same(it, write),
OutputFormat::XmlBinary => match self.parallel {
None => to_binary(it, write),
Some(n_threads) => td_to_binary_par(it, write, n_threads, self.chunk_size),
Some(n_threads) => {
td_to_binary_par(it, write, n_threads, self.chunk_size, self.keep_row_order)
}
},
OutputFormat::XmlBinary2 => match self.parallel {
None => to_binary2(it, write),
Some(n_threads) => td_to_binary2_par(it, write, n_threads, self.chunk_size),
Some(n_threads) => {
td_to_binary2_par(it, write, n_threads, self.chunk_size, self.keep_row_order)
}
},
OutputFormat::CSV => {
let mut raw_row_it = it.to_owned_tabledata_row_iterator();
Expand Down Expand Up @@ -170,6 +174,7 @@ impl StreamConvert {
self.separator,
n_threads,
self.chunk_size,
self.keep_row_order,
)
}
}
Expand All @@ -179,12 +184,16 @@ impl StreamConvert {
TableOrBinOrBin2::Binary => match self.output_fmt {
OutputFormat::XmlTabledata => match self.parallel {
None => to_tabledata(it, write),
Some(n_threads) => binary_to_td_par(it, write, n_threads, self.chunk_size),
Some(n_threads) => {
binary_to_td_par(it, write, n_threads, self.chunk_size, self.keep_row_order)
}
},
OutputFormat::XmlBinary => to_same(it, write),
OutputFormat::XmlBinary2 => match self.parallel {
None => to_binary2(it, write),
Some(n_threads) => binary_to_binary2_par(it, write, n_threads, self.chunk_size),
Some(n_threads) => {
binary_to_binary2_par(it, write, n_threads, self.chunk_size, self.keep_row_order)
}
},
OutputFormat::CSV => match self.parallel {
None => to_csv(it, write, self.separator),
Expand All @@ -202,18 +211,23 @@ impl StreamConvert {
self.separator,
n_threads,
self.chunk_size,
self.keep_row_order,
)
}
},
},
TableOrBinOrBin2::Binary2 => match self.output_fmt {
OutputFormat::XmlTabledata => match self.parallel {
None => to_tabledata(it, write),
Some(n_threads) => binary2_to_td_par(it, write, n_threads, self.chunk_size),
Some(n_threads) => {
binary2_to_td_par(it, write, n_threads, self.chunk_size, self.keep_row_order)
}
},
OutputFormat::XmlBinary => match self.parallel {
None => to_binary(it, write),
Some(n_threads) => binary2_to_binary_par(it, write, n_threads, self.chunk_size),
Some(n_threads) => {
binary2_to_binary_par(it, write, n_threads, self.chunk_size, self.keep_row_order)
}
},
OutputFormat::XmlBinary2 => to_same(it, write),
OutputFormat::CSV => match self.parallel {
Expand All @@ -232,6 +246,7 @@ impl StreamConvert {
self.separator,
n_threads,
self.chunk_size,
self.keep_row_order,
)
}
},
Expand Down Expand Up @@ -525,6 +540,7 @@ fn td_to_binary_par<R: BufRead + Send, W: Write>(
write: W,
n_threads: usize,
chunk_size: usize,
keep_row_order: bool,
) -> Result<(), VOTableError> {
let mut writer = new_xml_writer(write, None, None);
if it
Expand Down Expand Up @@ -571,6 +587,7 @@ fn td_to_binary_par<R: BufRead + Send, W: Write>(
' ',
n_threads,
chunk_size,
keep_row_order,
)
.and_then(|_| raw_row_it.read_to_end())
.and_then(|mut out_vot| out_vot.write_from_data_end(&mut writer, &(), false))
Expand All @@ -585,6 +602,7 @@ fn td_to_binary2_par<R: BufRead + Send, W: Write>(
write: W,
n_threads: usize,
chunk_size: usize,
keep_row_order: bool,
) -> Result<(), VOTableError> {
let mut writer = new_xml_writer(write, None, None);
if it
Expand Down Expand Up @@ -620,6 +638,7 @@ fn td_to_binary2_par<R: BufRead + Send, W: Write>(
' ',
n_threads,
chunk_size,
keep_row_order,
)
.and_then(|_| raw_row_it.read_to_end())
.and_then(|mut out_vot| out_vot.write_from_data_end(&mut writer, &(), false))
Expand All @@ -634,6 +653,7 @@ fn binary_to_td_par<R: BufRead + Send, W: Write>(
write: W,
n_threads: usize,
chunk_size: usize,
keep_row_order: bool,
) -> Result<(), VOTableError> {
let mut writer = new_xml_writer(write, None, None);
if it
Expand Down Expand Up @@ -663,6 +683,7 @@ fn binary_to_td_par<R: BufRead + Send, W: Write>(
' ',
n_threads,
chunk_size,
keep_row_order,
)
.and_then(|_| raw_row_it.read_to_end())
.and_then(|mut out_vot| out_vot.write_from_data_end(&mut writer, &(), false))
Expand All @@ -677,6 +698,7 @@ fn binary_to_binary2_par<R: BufRead + Send, W: Write>(
write: W,
n_threads: usize,
chunk_size: usize,
keep_row_order: bool,
) -> Result<(), VOTableError> {
let mut writer = new_xml_writer(write, None, None);
if it
Expand Down Expand Up @@ -706,6 +728,7 @@ fn binary_to_binary2_par<R: BufRead + Send, W: Write>(
' ',
n_threads,
chunk_size,
keep_row_order,
)
.and_then(|_| raw_row_it.read_to_end())
.and_then(|mut out_vot| out_vot.write_from_data_end(&mut writer, &(), false))
Expand All @@ -721,6 +744,7 @@ fn binary2_to_td_par<R: BufRead + Send, W: Write>(
write: W,
n_threads: usize,
chunk_size: usize,
keep_row_order: bool,
) -> Result<(), VOTableError> {
let mut writer = new_xml_writer(write, None, None);
if it
Expand Down Expand Up @@ -754,6 +778,7 @@ fn binary2_to_td_par<R: BufRead + Send, W: Write>(
' ',
n_threads,
chunk_size,
keep_row_order,
)
.and_then(|_| raw_row_it.read_to_end())
.and_then(|mut out_vot| out_vot.write_from_data_end(&mut writer, &(), false))
Expand All @@ -768,6 +793,7 @@ fn binary2_to_binary_par<R: BufRead + Send, W: Write>(
write: W,
n_threads: usize,
chunk_size: usize,
keep_row_order: bool,
) -> Result<(), VOTableError> {
let mut writer = new_xml_writer(write, None, None);
if it
Expand Down Expand Up @@ -801,6 +827,7 @@ fn binary2_to_binary_par<R: BufRead + Send, W: Write>(
' ',
n_threads,
chunk_size,
keep_row_order,
)
.and_then(|_| raw_row_it.read_to_end())
.and_then(|mut out_vot| out_vot.write_from_data_end(&mut writer, &(), false))
Expand Down Expand Up @@ -835,59 +862,124 @@ fn convert_par<I, W>(
separator: char,
n_threads: usize,
chunk_size: usize,
keep_order: bool,
) -> Result<(), VOTableError>
where
I: Iterator<Item = Result<Vec<u8>, VOTableError>> + Send,
W: Write,
{
let schema = schema;
// Usage of crossbeam from https://rust-lang-nursery.github.io/rust-cookbook/concurrency/threads.html
let (snd1, rcv1) = bounded(1);
let (snd2, rcv2) = bounded(1);
scope(|s| {
// Producer thread
s.spawn(|| {
let mut rows_chunk = load_n(raw_row_it, chunk_size);
while !rows_chunk.is_empty() {
snd1
.send(rows_chunk)
.expect("Unexpected error sending raw rows");
rows_chunk = load_n(raw_row_it, chunk_size);
if keep_order {
let n_threads = n_threads.max(1);
let (mut senders1, receivers1): (Vec<Sender<_>>, Vec<Receiver<_>>) =
(0..n_threads).map(|_| bounded(1)).unzip();
let (mut senders2, receivers2): (Vec<Sender<_>>, Vec<Receiver<_>>) =
(0..n_threads).map(|_| bounded(1)).unzip();
scope(|s| {
// Producer thread
s.spawn(|| {
{
let mut rows_chunk = load_n(raw_row_it, chunk_size);
let mut senders_it = senders1.iter().cycle();
while !rows_chunk.is_empty() {
senders_it
.next()
.unwrap()
.send(rows_chunk)
.expect("Unexpected error sending raw rows");
rows_chunk = load_n(raw_row_it, chunk_size);
}
}
// Close the channels, otherwise sink will never exit the for-loop
senders1.drain(..).for_each(|sender| drop(sender));
});
// Parallel processing by n_threads
for (sendr2, recvr1) in senders2.iter().cloned().zip(receivers1.iter().cloned()) {
// Send to sink, receive from producer
let schema = schema.clone();
// Spawn workers in separate threads
s.spawn(move || {
// Receive until channel closes
for raw_rows_chunk in recvr1.iter() {
let converted_raw_rows_chunk = raw_rows_chunk
.iter()
.map(|raw_row| convert(raw_row, &schema, separator))
.collect::<Vec<Box<[u8]>>>();
sendr2
.send(converted_raw_rows_chunk)
.expect("Unexpected error sending converted rows");
}
});
}
// Close the channel, otherwise sink will never exit the for-loop
drop(snd1);
senders2.drain(..).for_each(|sender| drop(sender));
// Sink
for recvr2 in receivers2.iter().cycle() {
match recvr2.recv() {
Ok(raw_rows) => {
for raw_row in raw_rows {
match write.write_all(&raw_row) {
Ok(()) => (),
Err(e) => panic!("Error writing in parallel: {:?}", e),
}
}
}
Err(_) => {
// No more date to be written
break;
}
}
}
});
// Parallel processing by n_threads
for _ in 0..n_threads {
// Send to sink, receive from source
let (sendr, recvr) = (snd2.clone(), rcv1.clone());
let schema = schema.clone();
// Spawn workers in separate threads
s.spawn(move || {
// Receive until channel closes
for raw_rows_chunk in recvr.iter() {
let converted_raw_rows_chunk = raw_rows_chunk
.iter()
.map(|raw_row| convert(raw_row, &schema, separator))
.collect::<Vec<Box<[u8]>>>();
sendr
.send(converted_raw_rows_chunk)
.expect("Unexpected error sending converted rows");
} else {
// Usage of crossbeam from https://rust-lang-nursery.github.io/rust-cookbook/concurrency/threads.html
let (snd1, rcv1) = bounded(1);
let (snd2, rcv2) = bounded(1);
scope(|s| {
// Producer thread
s.spawn(|| {
let mut rows_chunk = load_n(raw_row_it, chunk_size);
while !rows_chunk.is_empty() {
snd1
.send(rows_chunk)
.expect("Unexpected error sending raw rows");
rows_chunk = load_n(raw_row_it, chunk_size);
}
// Close the channel, otherwise sink will never exit the for-loop
drop(snd1);
});
}
// Close the channel, otherwise sink will never exit the for-loop
drop(snd2);
// Sink
for raw_rows in rcv2.iter() {
for raw_row in raw_rows {
match write.write_all(&raw_row) {
Ok(()) => (),
Err(e) => panic!("Error writing in parallel: {:?}", e),
// Parallel processing by n_threads
for _ in 0..n_threads {
// Send to sink, receive from source
let (sendr, recvr) = (snd2.clone(), rcv1.clone());
let schema = schema.clone();
// Spawn workers in separate threads
s.spawn(move || {
// Receive until channel closes
for raw_rows_chunk in recvr.iter() {
let converted_raw_rows_chunk = raw_rows_chunk
.iter()
.map(|raw_row| convert(raw_row, &schema, separator))
.collect::<Vec<Box<[u8]>>>();
sendr
.send(converted_raw_rows_chunk)
.expect("Unexpected error sending converted rows");
}
});
}
// Close the channel, otherwise sink will never exit the for-loop
drop(snd2);
// Sink
for raw_rows in rcv2.iter() {
for raw_row in raw_rows {
match write.write_all(&raw_row) {
Ok(()) => (),
Err(e) => panic!("Error writing in parallel: {:?}", e),
}
}
}
}
});
});
}
Ok(())
}

Expand Down

0 comments on commit 67e9388

Please sign in to comment.