Skip to content

Replace parts of futures-util with std APIs #1233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions postgres/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{Error, Notification};
use futures_util::{future, pin_mut, Stream};
use futures_util::Stream;
use std::collections::VecDeque;
use std::future::Future;
use std::future::{self, Future};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -52,7 +52,7 @@ impl Connection {
where
F: Future<Output = Result<T, Error>>,
{
pin_mut!(future);
let mut future = pin!(future);
self.poll_block_on(|cx, _, _| future.as_mut().poll(cx))
}

Expand Down
4 changes: 2 additions & 2 deletions postgres/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use crate::connection::ConnectionRef;
use crate::{Error, Notification};
use fallible_iterator::FallibleIterator;
use futures_util::{ready, FutureExt};
use futures_util::FutureExt;
use std::pin::Pin;
use std::task::Poll;
use std::task::{ready, Poll};
use std::time::Duration;
use tokio::time::{self, Instant, Sleep};

Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/binary_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::types::{FromSql, IsNull, ToSql, Type, WrongType};
use crate::{slice_iter, CopyInSink, CopyOutStream, Error};
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_util::{ready, SinkExt, Stream};
use futures_util::{SinkExt, Stream};
use pin_project_lite::pin_project;
use postgres_types::BorrowToSql;
use std::convert::TryFrom;
Expand All @@ -13,7 +13,7 @@ use std::io::Cursor;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0";
const HEADER_LEN: usize = MAGIC.len() + 4 + 4;
Expand Down
23 changes: 12 additions & 11 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ use crate::{
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
use futures_util::{ready, StreamExt, TryStreamExt};
use parking_lot::Mutex;
use postgres_protocol::message::backend::Message;
use postgres_types::BorrowToSql;
use std::collections::HashMap;
use std::fmt;
use std::future;
#[cfg(feature = "runtime")]
use std::net::IpAddr;
#[cfg(feature = "runtime")]
use std::path::PathBuf;
use std::pin::pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -300,8 +302,7 @@ impl Client {
where
T: ?Sized + ToStatement,
{
let stream = self.query_raw(statement, slice_iter(params)).await?;
pin_mut!(stream);
let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?);

let mut first = None;

Expand Down Expand Up @@ -336,18 +337,18 @@ impl Client {
///
/// ```no_run
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
/// use futures_util::{pin_mut, TryStreamExt};
/// use std::pin::pin;
/// use futures_util::TryStreamExt;
///
/// let params: Vec<String> = vec![
/// "first param".into(),
/// "second param".into(),
/// ];
/// let mut it = client.query_raw(
/// let mut it = pin!(client.query_raw(
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
/// params,
/// ).await?;
/// ).await?);
///
/// pin_mut!(it);
/// while let Some(row) = it.try_next().await? {
/// let foo: i32 = row.get("foo");
/// println!("foo: {}", foo);
Expand Down Expand Up @@ -402,19 +403,19 @@ impl Client {
///
/// ```no_run
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
/// use futures_util::{pin_mut, TryStreamExt};
/// use std::pin::pin;
/// use futures_util::{TryStreamExt};
/// use tokio_postgres::types::Type;
///
/// let params: Vec<(String, Type)> = vec![
/// ("first param".into(), Type::TEXT),
/// ("second param".into(), Type::TEXT),
/// ];
/// let mut it = client.query_typed_raw(
/// let mut it = pin!(client.query_typed_raw(
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
/// params,
/// ).await?;
/// ).await?);
///
/// pin_mut!(it);
/// while let Some(row) = it.try_next().await? {
/// let foo: i32 = row.get("foo");
/// println!("foo: {}", foo);
Expand Down
24 changes: 13 additions & 11 deletions tokio-postgres/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::tls::MakeTlsConnect;
use crate::{Client, Config, Connection, Error, SimpleQueryMessage, Socket};
use futures_util::{future, pin_mut, Future, FutureExt, Stream};
use futures_util::{FutureExt, Stream};
use rand::seq::SliceRandom;
use std::future::{self, Future};
use std::pin::pin;
use std::task::Poll;
use std::{cmp, io};
use tokio::net;
Expand Down Expand Up @@ -161,18 +163,18 @@ where
let (mut client, mut connection) = connect_raw(socket, tls, has_hostname, config).await?;

if config.target_session_attrs != TargetSessionAttrs::Any {
let rows = client.simple_query_raw("SHOW transaction_read_only");
pin_mut!(rows);
let mut rows = pin!(client.simple_query_raw("SHOW transaction_read_only"));

let rows = future::poll_fn(|cx| {
if connection.poll_unpin(cx)?.is_ready() {
return Poll::Ready(Err(Error::closed()));
}
let mut rows = pin!(
future::poll_fn(|cx| {
if connection.poll_unpin(cx)?.is_ready() {
return Poll::Ready(Err(Error::closed()));
}

rows.as_mut().poll(cx)
})
.await?;
pin_mut!(rows);
rows.as_mut().poll(cx)
})
.await?
);

loop {
let next = future::poll_fn(|cx| {
Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/connect_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{Client, Connection, Error};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{ready, Sink, SinkExt, Stream, TryStreamExt};
use futures_util::{Sink, SinkExt, Stream, TryStreamExt};
use postgres_protocol::authentication;
use postgres_protocol::authentication::sasl;
use postgres_protocol::authentication::sasl::ScramSha256;
Expand All @@ -17,7 +17,7 @@ use std::borrow::Cow;
use std::collections::{HashMap, VecDeque};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use crate::{AsyncMessage, Error, Notification};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{ready, stream::FusedStream, Sink, Stream, StreamExt};
use futures_util::{stream::FusedStream, Sink, Stream, StreamExt};
use log::{info, trace};
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

Expand Down
5 changes: 3 additions & 2 deletions tokio-postgres/src/copy_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ use crate::query::extract_row_affected;
use crate::{query, slice_iter, Error, Statement};
use bytes::{Buf, BufMut, BytesMut};
use futures_channel::mpsc;
use futures_util::{future, ready, Sink, SinkExt, Stream, StreamExt};
use futures_util::{Sink, SinkExt, Stream, StreamExt};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use postgres_protocol::message::frontend::CopyData;
use std::future;
use std::marker::{PhantomData, PhantomPinned};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

enum CopyInMessage {
Message(FrontendMessage),
Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/copy_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{query, slice_iter, Error, Statement};
use bytes::Bytes;
use futures_util::{ready, Stream};
use futures_util::Stream;
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
debug!("executing copy out statement {}", statement.name());
Expand Down
7 changes: 3 additions & 4 deletions tokio-postgres/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use crate::{query, slice_iter};
use crate::{Column, Error, Statement};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{pin_mut, TryStreamExt};
use futures_util::TryStreamExt;
use log::debug;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::future::Future;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -142,8 +142,7 @@ pub(crate) async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type

let stmt = typeinfo_statement(client).await?;

let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
pin_mut!(rows);
let mut rows = pin!(query::query(client, stmt, slice_iter(&[&oid])).await?);

let row = match rows.try_next().await? {
Some(row) => row,
Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::types::{BorrowToSql, IsNull};
use crate::{Column, Error, Portal, Row, Statement};
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Stream};
use futures_util::Stream;
use log::{debug, log_enabled, Level};
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
Expand All @@ -16,7 +16,7 @@ use std::fmt;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);

Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/simple_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use crate::query::extract_row_affected;
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Stream};
use futures_util::Stream;
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

/// Information about a column of a single query row.
#[derive(Debug)]
Expand Down
12 changes: 5 additions & 7 deletions tokio-postgres/tests/test/binary_copy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::connect;
use futures_util::{pin_mut, TryStreamExt};
use futures_util::TryStreamExt;
use std::pin::pin;
use tokio_postgres::binary_copy::{BinaryCopyInWriter, BinaryCopyOutStream};
use tokio_postgres::types::Type;

Expand All @@ -16,8 +17,7 @@ async fn write_basic() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]));
writer.as_mut().write(&[&1i32, &"foobar"]).await.unwrap();
writer
.as_mut()
Expand Down Expand Up @@ -50,8 +50,7 @@ async fn write_many_rows() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]));

for i in 0..10_000i32 {
writer
Expand Down Expand Up @@ -86,8 +85,7 @@ async fn write_big_rows() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]));

for i in 0..2i32 {
writer
Expand Down
16 changes: 6 additions & 10 deletions tokio-postgres/tests/test/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

use bytes::{Bytes, BytesMut};
use futures_channel::mpsc;
use futures_util::{
future, join, pin_mut, stream, try_join, Future, FutureExt, SinkExt, StreamExt, TryStreamExt,
};
use futures_util::{join, stream, try_join, FutureExt, SinkExt, StreamExt, TryStreamExt};
use pin_project_lite::pin_project;
use std::fmt::Write;
use std::pin::Pin;
use std::future::{self, Future};
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -589,8 +588,7 @@ async fn copy_in() {
.into_iter()
.map(Ok::<_, Error>),
);
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send_all(&mut stream).await.unwrap();
let rows = sink.finish().await.unwrap();
assert_eq!(rows, 2);
Expand Down Expand Up @@ -636,8 +634,7 @@ async fn copy_in_large() {
.map(Ok::<_, Error>),
);

let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send_all(&mut stream).await.unwrap();
let rows = sink.finish().await.unwrap();
assert_eq!(rows, 10_000);
Expand All @@ -658,8 +655,7 @@ async fn copy_in_error() {
.unwrap();

{
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send(Bytes::from_static(b"1\tsteven")).await.unwrap();
}

Expand Down