Skip to content

Commit 215513e

Browse files
committed
Add a new parser that does not buffer lines
1 parent d8e28d3 commit 215513e

File tree

2 files changed

+295
-0
lines changed

2 files changed

+295
-0
lines changed

src/bin/tuc.rs

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use tuc::cut_lines::read_and_cut_lines;
99
use tuc::cut_str::read_and_cut_str;
1010
use tuc::help::{get_help, get_short_help};
1111
use tuc::options::{Opt, EOL};
12+
use tuc::stream::{read_and_cut_bytes_stream, StreamOpt};
1213

1314
#[cfg(feature = "fast-lane")]
1415
use tuc::fast_lane::{read_and_cut_text_as_bytes, FastOpt};
@@ -251,6 +252,8 @@ fn main() -> Result<()> {
251252
read_and_cut_bytes(&mut stdin, &mut stdout, &opt)?;
252253
} else if opt.bounds_type == BoundsType::Lines {
253254
read_and_cut_lines(&mut stdin, &mut stdout, &opt)?;
255+
} else if let Ok(stream_opt) = StreamOpt::try_from(&opt) {
256+
read_and_cut_bytes_stream(&mut stdin, &mut stdout, &stream_opt)?;
254257
} else if let Ok(fast_opt) = FastOpt::try_from(&opt) {
255258
read_and_cut_text_as_bytes(&mut stdin, &mut stdout, &fast_opt)?;
256259
} else {

src/stream.rs

+292
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
use crate::bounds::{BoundOrFiller, BoundsType, Side, UserBounds, UserBoundsList};
2+
use crate::options::{Opt, EOL};
3+
use anyhow::Result;
4+
use std::convert::TryFrom;
5+
use std::io::BufRead;
6+
use std::io::Write;
7+
use std::ops::Deref;
8+
9+
#[derive(Debug)]
10+
struct ForwardBounds {
11+
pub list: UserBoundsList,
12+
last_bound_idx: usize,
13+
}
14+
15+
impl TryFrom<&UserBoundsList> for ForwardBounds {
16+
type Error = &'static str;
17+
18+
fn try_from(value: &UserBoundsList) -> Result<Self, Self::Error> {
19+
if value.is_empty() {
20+
Err("Cannot create ForwardBounds from an empty UserBoundsList")
21+
} else if value.is_forward_only() {
22+
let value: UserBoundsList = UserBoundsList(value.iter().cloned().collect());
23+
let mut maybe_last_bound: Option<usize> = None;
24+
value.iter().enumerate().rev().any(|(idx, bof)| {
25+
if matches!(bof, BoundOrFiller::Bound(_)) {
26+
maybe_last_bound = Some(idx);
27+
true
28+
} else {
29+
false
30+
}
31+
});
32+
33+
if let Some(last_bound_idx) = maybe_last_bound {
34+
Ok(ForwardBounds {
35+
list: value,
36+
last_bound_idx,
37+
})
38+
} else {
39+
Err("Cannot create ForwardBounds from UserBoundsList without bounds")
40+
}
41+
} else {
42+
Err("The provided UserBoundsList is not forward only")
43+
}
44+
}
45+
}
46+
47+
impl Deref for ForwardBounds {
48+
type Target = UserBoundsList;
49+
50+
fn deref(&self) -> &Self::Target {
51+
&self.list
52+
}
53+
}
54+
55+
impl ForwardBounds {
56+
fn get_last_bound(&self) -> &UserBounds {
57+
if let Some(BoundOrFiller::Bound(b)) = self.list.get(self.last_bound_idx) {
58+
b
59+
} else {
60+
panic!("Invariant error: last_bound_idx failed to match a bound")
61+
}
62+
}
63+
}
64+
65+
#[derive(Debug)]
66+
pub struct StreamOpt {
67+
delimiter: u8,
68+
join: bool,
69+
eol: EOL,
70+
bounds: ForwardBounds,
71+
// ## only_delimited: bool, ##
72+
// We can't support it, because we read fixed blocks of data.
73+
// If we don't find the delimiter in one block and move the next block,
74+
// and then we find the delimiter, we can't print the content from the
75+
// previous block, it's lost.
76+
// The alternative would be to start buffering blocks, but who knows
77+
// how much they'd grow: it would be not different from buffering the
78+
// whole line, and this mode is about doing the job with fixed memory.
79+
}
80+
81+
impl TryFrom<&Opt> for StreamOpt {
82+
type Error = &'static str;
83+
84+
fn try_from(value: &Opt) -> Result<Self, Self::Error> {
85+
if !value.delimiter.as_bytes().len() == 1 {
86+
return Err("Delimiter must be 1 byte wide for FastOpt");
87+
}
88+
89+
if value.complement
90+
|| value.greedy_delimiter
91+
|| value.compress_delimiter
92+
|| value.json
93+
|| value.bounds_type != BoundsType::Fields
94+
|| value.replace_delimiter.is_some()
95+
|| value.trim.is_some()
96+
|| value.regex_bag.is_some()
97+
// only_delimited can't be supported without reading the full line first
98+
// to search for delimiters, which can't be done if we read by chunks.
99+
|| value.only_delimited
100+
{
101+
return Err(
102+
"StreamOpt supports solely forward fields, join and single-character delimiters",
103+
);
104+
}
105+
106+
if let Ok(forward_bounds) = ForwardBounds::try_from(&value.bounds) {
107+
Ok(StreamOpt {
108+
delimiter: value.delimiter.as_bytes().first().unwrap().to_owned(),
109+
join: value.join,
110+
eol: value.eol,
111+
bounds: forward_bounds,
112+
})
113+
} else {
114+
Err("Bounds cannot be converted to ForwardBounds")
115+
}
116+
}
117+
}
118+
119+
pub fn read_and_cut_bytes_stream<R: BufRead, W: Write>(
120+
stdin: &mut R,
121+
stdout: &mut W,
122+
opt: &StreamOpt,
123+
) -> Result<()> {
124+
let last_interesting_field = opt.bounds.get_last_bound().r;
125+
cut_bytes_stream(stdin, stdout, opt, last_interesting_field)?;
126+
Ok(())
127+
}
128+
129+
#[inline(always)]
130+
fn print_field<W: Write>(
131+
stdin: &mut W,
132+
buffer: &[u8],
133+
delim: u8,
134+
prepend_delimiter: bool,
135+
) -> Result<()> {
136+
if prepend_delimiter {
137+
stdin.write_all(&[delim])?;
138+
}
139+
stdin.write_all(buffer)?;
140+
Ok(())
141+
}
142+
143+
fn cut_bytes_stream<R: BufRead, W: Write>(
144+
stdin: &mut R,
145+
stdout: &mut W,
146+
opt: &StreamOpt,
147+
last_interesting_field: Side,
148+
) -> Result<()> {
149+
//let mut buffer: Vec<u8> = Vec::with_capacity(64 * 1024);
150+
151+
let eol: u8 = opt.eol.into();
152+
153+
// With this algorithm we can only move forward, so we can't
154+
// support overlapping ranges
155+
// XXX TODO panic (or move to something different than FastOpt)
156+
157+
'outer: loop {
158+
// new line
159+
// dbg!("new line");
160+
161+
let mut bounds_idx = 0;
162+
let mut available;
163+
164+
let mut curr_field = 0;
165+
let mut go_to_next_line = false;
166+
let mut field_is_continuation = false;
167+
168+
let mut used;
169+
170+
'fields: loop {
171+
available = stdin.fill_buf()?;
172+
//let tmp = available.to_str_lossy();
173+
174+
if available.is_empty() {
175+
// end of file
176+
stdout.write_all(&[opt.eol.into()])?;
177+
break 'outer;
178+
}
179+
180+
let mut prev_idx = 0;
181+
for idx in memchr::memchr2_iter(opt.delimiter, eol, available) {
182+
used = idx + 1;
183+
184+
curr_field += 1;
185+
186+
if let Some(BoundOrFiller::Bound(b)) = opt.bounds.get(bounds_idx) {
187+
// TODO creates a dedicated match function
188+
if b.matches(curr_field).unwrap() {
189+
if field_is_continuation && idx == 0 {
190+
bounds_idx += 1;
191+
} else {
192+
print_field(
193+
stdout,
194+
&available[prev_idx..idx],
195+
//opt.delimiter,
196+
b'\t',
197+
!field_is_continuation
198+
&& curr_field > 1
199+
&& (opt.join || (b.l != b.r || b.r == Side::Continue)),
200+
)?;
201+
202+
if b.r == Side::Some(curr_field) {
203+
bounds_idx += 1;
204+
}
205+
}
206+
}
207+
}
208+
209+
field_is_continuation = false;
210+
211+
prev_idx = idx + 1;
212+
213+
if available[idx] == eol {
214+
// end of line reached
215+
break 'fields;
216+
}
217+
218+
if Side::Some(curr_field) == last_interesting_field {
219+
// There are no more fields we're interested in,
220+
// let's move to the next line
221+
go_to_next_line = true;
222+
break 'fields;
223+
}
224+
}
225+
226+
// We exhausted the buffer before reaching the next line, so
227+
// - there could be more fields to read
228+
// - the last byte was likely not a delimiter and there is the
229+
// start of a field still in the buffer
230+
231+
if !available[prev_idx..].is_empty() {
232+
curr_field += 1;
233+
234+
if let Some(BoundOrFiller::Bound(b)) = opt.bounds.get(bounds_idx) {
235+
if b.matches(curr_field).unwrap() {
236+
print_field(
237+
stdout,
238+
&available[prev_idx..],
239+
// opt.delimiter, false)?;
240+
b'\t',
241+
!field_is_continuation && curr_field > 1 && (opt.join),
242+
)?;
243+
}
244+
}
245+
246+
// the field was split in two parts, let's reset its counter
247+
curr_field -= 1;
248+
field_is_continuation = true;
249+
}
250+
251+
// We keep `curr_field` as-is, consume the buffer and read the next block
252+
253+
used = available.len();
254+
stdin.consume(used);
255+
}
256+
257+
// We consumed every field we were interested in in this line
258+
259+
if go_to_next_line {
260+
let mut idx = used - 1; // remove one. We know it wasn't a newline and
261+
// it ensure that the buffer is not empty during the first loop
262+
263+
// let mut must_read_more = true;
264+
loop {
265+
//if !must_read_more && available[idx..].is_empty() {
266+
if available[idx..].is_empty() {
267+
stdout.write_all(&[opt.eol.into()])?;
268+
break 'outer;
269+
}
270+
271+
if let Some(eol_idx) = memchr::memchr(eol, &available[idx..]) {
272+
used = idx + eol_idx + 1;
273+
break;
274+
}
275+
276+
// Whops, eol was not found in the current buffer. Let's read some more
277+
278+
used = available.len();
279+
stdin.consume(used);
280+
available = stdin.fill_buf()?;
281+
idx = 0;
282+
//must_read_more = true;
283+
}
284+
}
285+
286+
stdin.consume(used);
287+
288+
stdout.write_all(&[opt.eol.into()])?;
289+
}
290+
291+
Ok(())
292+
}

0 commit comments

Comments
 (0)