Skip to content

Commit

Permalink
refactor(common): remove common re-export (#3346)
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto authored Oct 18, 2023
1 parent 8ebe25c commit d9216c3
Show file tree
Hide file tree
Showing 19 changed files with 156 additions and 151 deletions.
12 changes: 7 additions & 5 deletions src/body/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_channel::mpsc;
Expand All @@ -8,8 +11,7 @@ use http::HeaderMap;
use http_body::{Body, Frame, SizeHint};

use super::DecodedLength;
use crate::common::Future;
use crate::common::{task, watch, Pin, Poll};
use crate::common::watch;
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use crate::proto::h2::ping;

Expand Down Expand Up @@ -157,7 +159,7 @@ impl Body for Incoming {

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.kind {
Kind::Empty => Poll::Ready(None),
Expand Down Expand Up @@ -287,15 +289,15 @@ impl fmt::Debug for Incoming {

impl Sender {
/// Check to see if this `Sender` can send more data.
pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.data_tx
.poll_ready(cx)
.map_err(|_| crate::Error::new_closed())
}

fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
match self.want_rx.load(cx) {
WANT_READY => Poll::Ready(Ok(())),
WANT_PENDING => Poll::Pending,
Expand Down
10 changes: 6 additions & 4 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
use std::error::Error as StdError;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::rt::{Read, Write};
use bytes::Bytes;
Expand All @@ -10,7 +13,6 @@ use httparse::ParserConfig;

use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::{task, Future, Pin, Poll};
use crate::proto;
use crate::upgrade::Upgraded;

Expand Down Expand Up @@ -84,7 +86,7 @@ where
/// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.inner
.as_mut()
.expect("already upgraded")
Expand Down Expand Up @@ -128,7 +130,7 @@ impl<B> SendRequest<B> {
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.dispatch.poll_ready(cx)
}

Expand Down Expand Up @@ -254,7 +256,7 @@ where
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
Expand Down
8 changes: 5 additions & 3 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::rt::{Read, Write};
Expand All @@ -12,7 +15,6 @@ use http::{Request, Response};
use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::time::Time;
use crate::common::{task, Future, Pin, Poll};
use crate::proto;
use crate::rt::bounds::ExecutorClient;
use crate::rt::Timer;
Expand Down Expand Up @@ -79,7 +81,7 @@ impl<B> SendRequest<B> {
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
if self.is_closed() {
Poll::Ready(Err(crate::Error::new_closed()))
} else {
Expand Down Expand Up @@ -236,7 +238,7 @@ where
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
#[cfg(feature = "http1")]
Expand Down
19 changes: 8 additions & 11 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::task::{Context, Poll};
#[cfg(feature = "http2")]
use std::future::Future;
use std::{future::Future, pin::Pin};

#[cfg(feature = "http2")]
use http::{Request, Response};
Expand All @@ -9,9 +10,8 @@ use http_body::Body;
use pin_project_lite::pin_project;
use tokio::sync::{mpsc, oneshot};

use crate::common::{task, Poll};
#[cfg(feature = "http2")]
use crate::{body::Incoming, common::Pin, proto::h2::client::ResponseFutMap};
use crate::{body::Incoming, proto::h2::client::ResponseFutMap};

#[cfg(test)]
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub(crate) struct UnboundedSender<T, U> {

impl<T, U> Sender<T, U> {
#[cfg(feature = "http1")]
pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.giver
.poll_want(cx)
.map_err(|_| crate::Error::new_closed())
Expand Down Expand Up @@ -169,10 +169,7 @@ pub(crate) struct Receiver<T, U> {
}

impl<T, U> Receiver<T, U> {
pub(crate) fn poll_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<(T, Callback<T, U>)>> {
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
match self.inner.poll_recv(cx) {
Poll::Ready(item) => {
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
Expand Down Expand Up @@ -261,7 +258,7 @@ impl<T, U> Callback<T, U> {
}
}

pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match *self {
Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
Expand Down Expand Up @@ -302,7 +299,7 @@ where
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

let mut call_back = this.call_back.take().expect("polled after complete");
Expand All @@ -319,7 +316,7 @@ where
Poll::Pending => {
// Move call_back back to struct before return
this.call_back.set(Some(call_back));
return std::task::Poll::Pending;
return Poll::Pending;
}
};
trace!("send_when canceled");
Expand Down
13 changes: 7 additions & 6 deletions src/common/io/rewind.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, io};

use bytes::{Buf, Bytes};

use crate::common::{task, Pin, Poll};
use crate::rt::{Read, ReadBufCursor, Write};

/// Combine a buffer with an IO, rewinding reads to use the buffer.
Expand Down Expand Up @@ -50,7 +51,7 @@ where
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
mut buf: ReadBufCursor<'_>,
) -> Poll<io::Result<()>> {
if let Some(mut prefix) = self.pre.take() {
Expand Down Expand Up @@ -78,25 +79,25 @@ where
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}

Expand Down
5 changes: 0 additions & 5 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,3 @@ pub(crate) mod task;
))]
pub(crate) mod time;
pub(crate) mod watch;

pub(crate) use self::task::Poll;

// group up types normally needed for `Future`
pub(crate) use std::{future::Future, pin::Pin};
3 changes: 2 additions & 1 deletion src/common/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) use std::task::{Context, Poll};
#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
use std::task::{Context, Poll};

/// A function to help "yield" a future, such that it is re-scheduled immediately.
///
Expand Down
28 changes: 13 additions & 15 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fmt;
use std::io;
use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
use std::task::{Context, Poll};
#[cfg(feature = "server")]
use std::time::Duration;

Expand All @@ -15,7 +17,6 @@ use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext
use crate::body::DecodedLength;
#[cfg(feature = "server")]
use crate::common::time::Time;
use crate::common::{task, Pin, Poll};
use crate::headers::connection_keep_alive;
use crate::proto::{BodyLength, MessageHead};
#[cfg(feature = "server")]
Expand Down Expand Up @@ -193,7 +194,7 @@ where

pub(super) fn poll_read_head(
&mut self,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
debug_assert!(self.can_read_head());
trace!("Conn::read_head");
Expand Down Expand Up @@ -294,7 +295,7 @@ where

pub(crate) fn poll_read_body(
&mut self,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Bytes>>> {
debug_assert!(self.can_read_body());

Expand Down Expand Up @@ -355,10 +356,7 @@ where
ret
}

pub(crate) fn poll_read_keep_alive(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<crate::Result<()>> {
pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
debug_assert!(!self.can_read_head() && !self.can_read_body());

if self.is_read_closed() {
Expand All @@ -381,7 +379,7 @@ where
//
// This should only be called for Clients wanting to enter the idle
// state.
fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
debug_assert!(!self.is_mid_message());
debug_assert!(T::is_client());
Expand Down Expand Up @@ -414,7 +412,7 @@ where
Poll::Ready(Err(crate::Error::new_unexpected_message()))
}

fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
debug_assert!(self.is_mid_message());

Expand All @@ -433,7 +431,7 @@ where
}
}

fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
debug_assert!(!self.state.is_read_closed());

let result = ready!(self.io.poll_read_from_io(cx));
Expand All @@ -444,7 +442,7 @@ where
}))
}

fn maybe_notify(&mut self, cx: &mut task::Context<'_>) {
fn maybe_notify(&mut self, cx: &mut Context<'_>) {
// its possible that we returned NotReady from poll() without having
// exhausted the underlying Io. We would have done this when we
// determined we couldn't keep reading until we knew how writing
Expand Down Expand Up @@ -491,7 +489,7 @@ where
}
}

fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) {
fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
self.state.try_keep_alive::<T>();
self.maybe_notify(cx);
}
Expand Down Expand Up @@ -716,14 +714,14 @@ where
Err(err)
}

pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(Pin::new(&mut self.io).poll_flush(cx))?;
self.try_keep_alive(cx);
trace!("flushed({}): {:?}", T::LOG, self.state);
Poll::Ready(Ok(()))
}

pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
Ok(()) => {
trace!("shut down IO complete");
Expand All @@ -737,7 +735,7 @@ where
}

/// If the read side can be cheaply drained, do so. Otherwise, close.
pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
if let Reading::Continue(ref decoder) = self.state.reading {
// skip sending the 100-continue
// just move forward to a read, in case a tiny body was included
Expand Down
Loading

0 comments on commit d9216c3

Please sign in to comment.