Skip to content

Commit 3872328

Browse files
taiki-eNemo157
authored andcommitted
Correct the behavior of read_until when Pending is returned
1 parent 2e2b013 commit 3872328

File tree

2 files changed

+108
-22
lines changed

2 files changed

+108
-22
lines changed

futures-util/src/io/read_until.rs

+31-20
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use futures_core::future::Future;
22
use futures_core::task::{Context, Poll};
33
use futures_io::AsyncBufRead;
44
use std::io;
5+
use std::mem;
56
use std::pin::Pin;
67

78
/// Future for the [`read_until`](super::AsyncBufReadExt::read_until) method.
@@ -10,38 +11,48 @@ pub struct ReadUntil<'a, R: ?Sized + Unpin> {
1011
reader: &'a mut R,
1112
byte: u8,
1213
buf: &'a mut Vec<u8>,
14+
read: usize,
1315
}
1416

1517
impl<R: ?Sized + Unpin> Unpin for ReadUntil<'_, R> {}
1618

1719
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
1820
pub(super) fn new(reader: &'a mut R, byte: u8, buf: &'a mut Vec<u8>) -> Self {
19-
Self { reader, byte, buf }
21+
Self { reader, byte, buf, read: 0 }
22+
}
23+
}
24+
25+
fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
26+
mut reader: Pin<&mut R>,
27+
byte: u8,
28+
buf: &mut Vec<u8>,
29+
read: &mut usize,
30+
cx: &mut Context<'_>,
31+
) -> Poll<io::Result<usize>> {
32+
loop {
33+
let (done, used) = {
34+
let available = try_ready!(reader.as_mut().poll_fill_buf(cx));
35+
if let Some(i) = memchr::memchr(byte, available) {
36+
buf.extend_from_slice(&available[..=i]);
37+
(true, i + 1)
38+
} else {
39+
buf.extend_from_slice(available);
40+
(false, available.len())
41+
}
42+
};
43+
reader.as_mut().consume(used);
44+
*read += used;
45+
if done || used == 0 {
46+
return Poll::Ready(Ok(mem::replace(read, 0)));
47+
}
2048
}
2149
}
2250

2351
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
2452
type Output = io::Result<usize>;
2553

2654
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27-
let this = &mut *self;
28-
let mut read = 0;
29-
loop {
30-
let (done, used) = {
31-
let available = try_ready!(Pin::new(&mut this.reader).poll_fill_buf(cx));
32-
if let Some(i) = memchr::memchr(this.byte, available) {
33-
this.buf.extend_from_slice(&available[..=i]);
34-
(true, i + 1)
35-
} else {
36-
this.buf.extend_from_slice(available);
37-
(false, available.len())
38-
}
39-
};
40-
Pin::new(&mut this.reader).consume(used);
41-
read += used;
42-
if done || used == 0 {
43-
return Poll::Ready(Ok(read));
44-
}
45-
}
55+
let Self { reader, byte, buf, read } = &mut *self;
56+
read_until_internal(Pin::new(reader), *byte, buf, read, cx)
4657
}
4758
}

futures/tests/io_read_until.rs

+77-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
use futures::executor::block_on;
2-
use futures::io::AsyncBufReadExt;
3-
use std::io::Cursor;
2+
use futures::future::Future;
3+
use futures::io::{AsyncRead, AsyncBufRead, AsyncBufReadExt};
4+
use futures::task::{Context, Poll};
5+
use futures_test::task::noop_context;
6+
use std::cmp;
7+
use std::io::{self, Cursor};
8+
use std::pin::Pin;
49

510
#[test]
611
fn read_until() {
@@ -20,3 +25,73 @@ fn read_until() {
2025
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 0);
2126
assert_eq!(v, []);
2227
}
28+
29+
fn run<F: Future + Unpin>(mut f: F) -> F::Output {
30+
let mut cx = noop_context();
31+
loop {
32+
if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) {
33+
return x;
34+
}
35+
}
36+
}
37+
38+
struct MaybePending<'a> {
39+
inner: &'a [u8],
40+
ready: bool,
41+
}
42+
43+
impl<'a> MaybePending<'a> {
44+
fn new(inner: &'a [u8]) -> Self {
45+
Self { inner, ready: false }
46+
}
47+
}
48+
49+
impl AsyncRead for MaybePending<'_> {
50+
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8])
51+
-> Poll<io::Result<usize>>
52+
{
53+
unimplemented!()
54+
}
55+
}
56+
57+
impl AsyncBufRead for MaybePending<'_> {
58+
fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>)
59+
-> Poll<io::Result<&'a [u8]>>
60+
{
61+
if self.ready {
62+
self.ready = false;
63+
if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
64+
let len = cmp::min(2, self.inner.len());
65+
Poll::Ready(Ok(&self.inner[0..len]))
66+
} else {
67+
self.ready = true;
68+
Poll::Pending
69+
}
70+
}
71+
72+
fn consume(mut self: Pin<&mut Self>, amt: usize) {
73+
self.inner = &self.inner[amt..];
74+
}
75+
}
76+
77+
#[test]
78+
fn maybe_pending() {
79+
let mut buf = MaybePending::new(b"12");
80+
let mut v = Vec::new();
81+
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2);
82+
assert_eq!(v, b"12");
83+
84+
let mut buf = MaybePending::new(b"12333");
85+
let mut v = Vec::new();
86+
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3);
87+
assert_eq!(v, b"123");
88+
v.clear();
89+
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1);
90+
assert_eq!(v, b"3");
91+
v.clear();
92+
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1);
93+
assert_eq!(v, b"3");
94+
v.clear();
95+
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 0);
96+
assert_eq!(v, []);
97+
}

0 commit comments

Comments
 (0)