Skip to content

Commit 493e400

Browse files
committed
Add a new parser that does not buffer lines
1 parent d8e28d3 commit 493e400

File tree

3 files changed

+298
-0
lines changed

3 files changed

+298
-0
lines changed

src/bin/tuc.rs

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use tuc::cut_lines::read_and_cut_lines;
99
use tuc::cut_str::read_and_cut_str;
1010
use tuc::help::{get_help, get_short_help};
1111
use tuc::options::{Opt, EOL};
12+
use tuc::stream::{read_and_cut_bytes_stream, StreamOpt};
1213

1314
#[cfg(feature = "fast-lane")]
1415
use tuc::fast_lane::{read_and_cut_text_as_bytes, FastOpt};
@@ -251,6 +252,8 @@ fn main() -> Result<()> {
251252
read_and_cut_bytes(&mut stdin, &mut stdout, &opt)?;
252253
} else if opt.bounds_type == BoundsType::Lines {
253254
read_and_cut_lines(&mut stdin, &mut stdout, &opt)?;
255+
} else if let Ok(stream_opt) = StreamOpt::try_from(&opt) {
256+
read_and_cut_bytes_stream(&mut stdin, &mut stdout, &stream_opt)?;
254257
} else if let Ok(fast_opt) = FastOpt::try_from(&opt) {
255258
read_and_cut_text_as_bytes(&mut stdin, &mut stdout, &fast_opt)?;
256259
} else {

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ pub mod fast_lane;
77
pub mod help;
88
pub mod options;
99
mod read_utils;
10+
pub mod stream;

src/stream.rs

+294
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
use crate::bounds::{BoundOrFiller, BoundsType, Side, UserBounds, UserBoundsList, UserBoundsTrait};
2+
use crate::options::{Opt, EOL};
3+
use anyhow::Result;
4+
use bstr::ByteSlice;
5+
use std::convert::TryFrom;
6+
use std::io::BufRead;
7+
use std::io::Write;
8+
use std::ops::Deref;
9+
10+
#[derive(Debug)]
11+
struct ForwardBounds {
12+
pub list: UserBoundsList,
13+
last_bound_idx: usize,
14+
}
15+
16+
impl TryFrom<&UserBoundsList> for ForwardBounds {
17+
type Error = &'static str;
18+
19+
fn try_from(value: &UserBoundsList) -> Result<Self, Self::Error> {
20+
if value.is_empty() {
21+
Err("Cannot create ForwardBounds from an empty UserBoundsList")
22+
} else if value.is_forward_only() {
23+
let value: UserBoundsList =
24+
value.iter().cloned().collect::<Vec<BoundOrFiller>>().into();
25+
let mut maybe_last_bound: Option<usize> = None;
26+
value.iter().enumerate().rev().any(|(idx, bof)| {
27+
if matches!(bof, BoundOrFiller::Bound(_)) {
28+
maybe_last_bound = Some(idx);
29+
true
30+
} else {
31+
false
32+
}
33+
});
34+
35+
if let Some(last_bound_idx) = maybe_last_bound {
36+
Ok(ForwardBounds {
37+
list: value,
38+
last_bound_idx,
39+
})
40+
} else {
41+
Err("Cannot create ForwardBounds from UserBoundsList without bounds")
42+
}
43+
} else {
44+
Err("The provided UserBoundsList is not forward only")
45+
}
46+
}
47+
}
48+
49+
impl Deref for ForwardBounds {
50+
type Target = UserBoundsList;
51+
52+
fn deref(&self) -> &Self::Target {
53+
&self.list
54+
}
55+
}
56+
57+
impl ForwardBounds {
58+
fn get_last_bound(&self) -> &UserBounds {
59+
if let Some(BoundOrFiller::Bound(b)) = self.list.get(self.last_bound_idx) {
60+
b
61+
} else {
62+
panic!("Invariant error: last_bound_idx failed to match a bound")
63+
}
64+
}
65+
}
66+
67+
#[derive(Debug)]
68+
pub struct StreamOpt {
69+
delimiter: u8,
70+
join: bool,
71+
eol: EOL,
72+
bounds: ForwardBounds,
73+
// ## only_delimited: bool, ##
74+
// We can't support it, because we read fixed blocks of data.
75+
// If we don't find the delimiter in one block and move the next block,
76+
// and then we find the delimiter, we can't print the content from the
77+
// previous block, it's lost.
78+
// The alternative would be to start buffering blocks, but who knows
79+
// how much they'd grow: it would be not different from buffering the
80+
// whole line, and this mode is about doing the job with fixed memory.
81+
}
82+
83+
impl TryFrom<&Opt> for StreamOpt {
84+
type Error = &'static str;
85+
86+
fn try_from(value: &Opt) -> Result<Self, Self::Error> {
87+
if !value.delimiter.as_bytes().len() == 1 {
88+
return Err("Delimiter must be 1 byte wide for FastOpt");
89+
}
90+
91+
if value.complement
92+
|| value.greedy_delimiter
93+
|| value.compress_delimiter
94+
|| value.json
95+
|| value.bounds_type != BoundsType::Fields
96+
|| value.replace_delimiter.is_some()
97+
|| value.trim.is_some()
98+
|| value.regex_bag.is_some()
99+
// only_delimited can't be supported without reading the full line first
100+
// to search for delimiters, which can't be done if we read by chunks.
101+
|| value.only_delimited
102+
{
103+
return Err(
104+
"StreamOpt supports solely forward fields, join and single-character delimiters",
105+
);
106+
}
107+
108+
if let Ok(forward_bounds) = ForwardBounds::try_from(&value.bounds) {
109+
Ok(StreamOpt {
110+
delimiter: value.delimiter.as_bytes().first().unwrap().to_owned(),
111+
join: value.join,
112+
eol: value.eol,
113+
bounds: forward_bounds,
114+
})
115+
} else {
116+
Err("Bounds cannot be converted to ForwardBounds")
117+
}
118+
}
119+
}
120+
121+
pub fn read_and_cut_bytes_stream<R: BufRead, W: Write>(
122+
stdin: &mut R,
123+
stdout: &mut W,
124+
opt: &StreamOpt,
125+
) -> Result<()> {
126+
let last_interesting_field = opt.bounds.get_last_bound().r;
127+
cut_bytes_stream(stdin, stdout, opt, last_interesting_field)?;
128+
Ok(())
129+
}
130+
131+
#[inline(always)]
132+
fn print_field<W: Write>(
133+
stdin: &mut W,
134+
buffer: &[u8],
135+
delim: u8,
136+
prepend_delimiter: bool,
137+
) -> Result<()> {
138+
if prepend_delimiter {
139+
stdin.write_all(&[delim])?;
140+
}
141+
stdin.write_all(buffer)?;
142+
Ok(())
143+
}
144+
145+
fn cut_bytes_stream<R: BufRead, W: Write>(
146+
stdin: &mut R,
147+
stdout: &mut W,
148+
opt: &StreamOpt,
149+
last_interesting_field: Side,
150+
) -> Result<()> {
151+
//let mut buffer: Vec<u8> = Vec::with_capacity(64 * 1024);
152+
153+
let eol: u8 = opt.eol.into();
154+
155+
// With this algorithm we can only move forward, so we can't
156+
// support overlapping ranges
157+
// XXX TODO panic (or move to something different than FastOpt)
158+
159+
'outer: loop {
160+
// new line
161+
// dbg!("new line");
162+
163+
let mut bounds_idx = 0;
164+
let mut available;
165+
166+
let mut curr_field = 0;
167+
let mut go_to_next_line = false;
168+
let mut field_is_continuation = false;
169+
170+
let mut used;
171+
172+
'fields: loop {
173+
available = stdin.fill_buf()?;
174+
//let tmp = available.to_str_lossy();
175+
176+
if available.is_empty() {
177+
// end of file
178+
stdout.write_all(&[opt.eol.into()])?;
179+
break 'outer;
180+
}
181+
182+
let mut prev_idx = 0;
183+
for idx in memchr::memchr2_iter(opt.delimiter, eol, available) {
184+
used = idx + 1;
185+
186+
curr_field += 1;
187+
188+
if let Some(BoundOrFiller::Bound(b)) = opt.bounds.get(bounds_idx) {
189+
// TODO creates a dedicated match function
190+
if b.matches(curr_field).unwrap() {
191+
if field_is_continuation && idx == 0 {
192+
bounds_idx += 1;
193+
} else {
194+
print_field(
195+
stdout,
196+
&available[prev_idx..idx],
197+
//opt.delimiter,
198+
b'\t',
199+
!field_is_continuation
200+
&& curr_field > 1
201+
&& (opt.join || (b.l != b.r || b.r == Side::Continue)),
202+
)?;
203+
204+
if b.r == Side::Some(curr_field) {
205+
bounds_idx += 1;
206+
}
207+
}
208+
}
209+
}
210+
211+
field_is_continuation = false;
212+
213+
prev_idx = idx + 1;
214+
215+
if available[idx] == eol {
216+
// end of line reached
217+
break 'fields;
218+
}
219+
220+
if Side::Some(curr_field) == last_interesting_field {
221+
// There are no more fields we're interested in,
222+
// let's move to the next line
223+
go_to_next_line = true;
224+
break 'fields;
225+
}
226+
}
227+
228+
// We exhausted the buffer before reaching the next line, so
229+
// - there could be more fields to read
230+
// - the last byte was likely not a delimiter and there is the
231+
// start of a field still in the buffer
232+
233+
if !available[prev_idx..].is_empty() {
234+
curr_field += 1;
235+
236+
if let Some(BoundOrFiller::Bound(b)) = opt.bounds.get(bounds_idx) {
237+
if b.matches(curr_field).unwrap() {
238+
print_field(
239+
stdout,
240+
&available[prev_idx..],
241+
// opt.delimiter, false)?;
242+
b'\t',
243+
!field_is_continuation && curr_field > 1 && (opt.join),
244+
)?;
245+
}
246+
}
247+
248+
// the field was split in two parts, let's reset its counter
249+
curr_field -= 1;
250+
field_is_continuation = true;
251+
}
252+
253+
// We keep `curr_field` as-is, consume the buffer and read the next block
254+
255+
used = available.len();
256+
stdin.consume(used);
257+
}
258+
259+
// We consumed every field we were interested in in this line
260+
261+
if go_to_next_line {
262+
let mut idx = used - 1; // remove one. We know it wasn't a newline and
263+
// it ensure that the buffer is not empty during the first loop
264+
265+
// let mut must_read_more = true;
266+
loop {
267+
//if !must_read_more && available[idx..].is_empty() {
268+
if available[idx..].is_empty() {
269+
stdout.write_all(&[opt.eol.into()])?;
270+
break 'outer;
271+
}
272+
273+
if let Some(eol_idx) = memchr::memchr(eol, &available[idx..]) {
274+
used = idx + eol_idx + 1;
275+
break;
276+
}
277+
278+
// Whops, eol was not found in the current buffer. Let's read some more
279+
280+
used = available.len();
281+
stdin.consume(used);
282+
available = stdin.fill_buf()?;
283+
idx = 0;
284+
//must_read_more = true;
285+
}
286+
}
287+
288+
stdin.consume(used);
289+
290+
stdout.write_all(&[opt.eol.into()])?;
291+
}
292+
293+
Ok(())
294+
}

0 commit comments

Comments
 (0)