From 80ca942140d156232286c3ac106ce7ffdf3fcc36 Mon Sep 17 00:00:00 2001 From: Alexey Orlenko Date: Wed, 15 Jan 2025 19:05:23 +0100 Subject: [PATCH] fix(schema-engine): drop buggy ws_stream_tungstenite dependency Fixes: https://linear.app/prisma-company/issue/ORM-504/fix-ws-stream-tungstenite-error-in-migrate --- Cargo.lock | 232 ++++++++---------- quaint/Cargo.toml | 15 +- .../connector/postgres/native/websocket.rs | 131 +++++++++- 3 files changed, 227 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a6bfb3aa1e..8c1470bd154 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,7 +167,7 @@ checksum = "d57d4cec3c647232e1094dc013546c0b33ce785d8aeb251e1f20dfaf8a9a13fe" dependencies = [ "futures-util", "native-tls", - "thiserror", + "thiserror 1.0.44", "url", ] @@ -179,35 +179,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", -] - -[[package]] -name = "async-tungstenite" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90e661b6cb0a6eb34d02c520b052daa3aa9ac0cc02495c9d066bbce13ead132b" -dependencies = [ - "futures-io", - "futures-util", - "log", - "native-tls", - "pin-project-lite", - "tokio", - "tokio-native-tls", - "tungstenite", -] - -[[package]] -name = "async_io_stream" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" -dependencies = [ - "futures", - "pharos", - "rustc_version", - "tokio", + "syn 2.0.96", ] [[package]] @@ -521,9 +493,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.7.2" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "cast" @@ -739,7 +711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f76990911f2267d837d9d0ad060aa63aaad170af40904b29461734c339030d4d" dependencies = [ "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -966,7 +938,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f34ba9a9bcb8645379e9de8cb3ecfcf4d1c85ba66d90deb3259206fa5aa193b" dependencies = [ "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1084,7 +1056,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1117,7 +1089,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core 0.20.10", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1312,7 +1284,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1333,7 +1305,7 @@ checksum = "5e9a1f9f7d83e59740248a6e14ecf93929ade55027844dfcea78beafccc15745" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1525,7 +1497,7 @@ checksum = "b0fa992f1656e1707946bbba340ad244f0814009ef8c0118eb7b658395f19a2e" dependencies = [ "frunk_proc_macro_helpers", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1537,7 +1509,7 @@ dependencies = [ "frunk_core", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1549,7 +1521,7 @@ dependencies = [ "frunk_core", "frunk_proc_macro_helpers", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1631,7 +1603,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -1736,7 +1708,7 @@ source = "git+https://github.com/prisma/graphql-parser#6a3f58bd879065588e710cb02 dependencies = [ "combine", "indexmap 1.9.3", - "thiserror", + "thiserror 1.0.44", ] [[package]] @@ -1845,7 +1817,7 @@ dependencies = [ "ipnet", "once_cell", "rand 0.8.5", - "thiserror", + "thiserror 1.0.44", "tinyvec", "tokio", "tracing", @@ -1868,7 +1840,7 @@ dependencies = [ "rand 0.8.5", "resolv-conf", "smallvec", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", ] @@ -2486,7 +2458,7 @@ dependencies = [ "metrics", "metrics-util", "quanta", - "thiserror", + "thiserror 1.0.44", ] [[package]] @@ -2554,7 +2526,7 @@ dependencies = [ "futures-util", "log", "metrics", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-subscriber", @@ -2598,7 +2570,7 @@ dependencies = [ "stringprep", "strsim 0.11.1", "take_mut", - "thiserror", + "thiserror 1.0.44", "tokio", "tokio-rustls", "tokio-util", @@ -2614,7 +2586,7 @@ dependencies = [ "mongodb", "once_cell", "percent-encoding", - "thiserror", + "thiserror 1.0.44", ] [[package]] @@ -2624,7 +2596,7 @@ source = "git+https://github.com/prisma/mongo-rust-driver.git?branch=RUST-1994%2 dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -2653,7 +2625,7 @@ dependencies = [ "serde", "serde_json", "telemetry", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-futures", @@ -2726,7 +2698,7 @@ dependencies = [ "serde", "serde_json", "socket2 0.4.9", - "thiserror", + "thiserror 1.0.44", "tokio", "tokio-native-tls", "tokio-util", @@ -2766,7 +2738,7 @@ dependencies = [ "sha2 0.10.7", "smallvec", "subprocess", - "thiserror", + "thiserror 1.0.44", "time", "uuid", ] @@ -2831,7 +2803,7 @@ dependencies = [ "napi-derive-backend", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -2846,7 +2818,7 @@ dependencies = [ "quote", "regex", "semver", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -3047,7 +3019,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -3157,7 +3129,7 @@ dependencies = [ "html-escape", "nom", "percent-encoding", - "thiserror", + "thiserror 1.0.44", ] [[package]] @@ -3216,7 +3188,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "560131c633294438da9f7c4b08189194b20946c8274c6b9e38881a7874dc8ee8" dependencies = [ "memchr", - "thiserror", + "thiserror 1.0.44", "ucd-trie", ] @@ -3240,7 +3212,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -3264,16 +3236,6 @@ dependencies = [ "ordermap", ] -[[package]] -name = "pharos" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" -dependencies = [ - "futures", - "rustc_version", -] - [[package]] name = "phf" version = "0.11.2" @@ -3309,7 +3271,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -3550,9 +3512,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" dependencies = [ "unicode-ident", ] @@ -3639,7 +3601,6 @@ name = "quaint" version = "0.2.0-alpha.13" dependencies = [ "async-trait", - "async-tungstenite", "base64 0.12.3", "bigdecimal", "bit-vec", @@ -3667,6 +3628,7 @@ dependencies = [ "once_cell", "paste", "percent-encoding", + "pin-project", "postgres-native-tls", "postgres-types", "prisma-metrics", @@ -3678,17 +3640,17 @@ dependencies = [ "serde_json", "sqlformat", "telemetry", - "thiserror", + "thiserror 1.0.44", "tiberius", "tokio", "tokio-postgres", + "tokio-tungstenite", "tokio-util", "tracing", "tracing-futures", "tracing-test", "url", "uuid", - "ws_stream_tungstenite", ] [[package]] @@ -3753,7 +3715,7 @@ dependencies = [ "serde", "serde_json", "telemetry", - "thiserror", + "thiserror 1.0.44", "user-facing-errors", "uuid", ] @@ -3787,7 +3749,7 @@ dependencies = [ "serde_json", "sql-query-builder", "telemetry", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-futures", @@ -3823,7 +3785,7 @@ dependencies = [ "sql-query-connector", "structopt", "telemetry", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-subscriber", @@ -3856,7 +3818,7 @@ dependencies = [ "serde_json", "sql-query-connector", "telemetry", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-futures", @@ -3880,7 +3842,7 @@ dependencies = [ "serde", "serde_json", "telemetry", - "thiserror", + "thiserror 1.0.44", "tracing", "tracing-futures", "tracing-subscriber", @@ -3915,7 +3877,7 @@ dependencies = [ "serde_json", "sql-query-connector", "telemetry", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-futures", @@ -3977,7 +3939,7 @@ dependencies = [ "serde_json", "sql-query-connector", "telemetry", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-futures", @@ -4003,7 +3965,7 @@ dependencies = [ "nanoid", "prisma-value", "psl", - "thiserror", + "thiserror 1.0.44", "ulid", "uuid", ] @@ -4050,7 +4012,7 @@ dependencies = [ "sql-query-connector", "strip-ansi-escapes", "telemetry", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-error", @@ -4335,7 +4297,7 @@ dependencies = [ "serde_json", "sql-query-connector", "telemetry", - "thiserror", + "thiserror 1.0.44", "tracing", "url", "user-facing-errors", @@ -4787,7 +4749,7 @@ checksum = "fabfb6138d2383ea8208cf98ccf69cdfb1aff4088460681d84189aa259762f97" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -4798,7 +4760,7 @@ checksum = "e578a843d40b4189a4d66bba51d7684f57da5bd7c304c64e14bd63efbef49509" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -4821,7 +4783,7 @@ checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -4863,7 +4825,7 @@ dependencies = [ "darling 0.20.10", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -4888,7 +4850,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -5152,7 +5114,7 @@ dependencies = [ "serde_json", "sql-query-builder", "telemetry", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-futures", @@ -5270,7 +5232,7 @@ dependencies = [ "percent-encoding", "smallvec", "sqlformat", - "thiserror", + "thiserror 1.0.44", "tracing", "url", ] @@ -5399,9 +5361,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.58" +version = "2.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687" +checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" dependencies = [ "proc-macro2", "quote", @@ -5444,7 +5406,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "thiserror", + "thiserror 1.0.44", "tokio", "tracing", "tracing-futures", @@ -5540,7 +5502,16 @@ version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.44", +] + +[[package]] +name = "thiserror" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" +dependencies = [ + "thiserror-impl 2.0.11", ] [[package]] @@ -5551,7 +5522,18 @@ checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", ] [[package]] @@ -5586,7 +5568,7 @@ dependencies = [ "opentls", "pin-project-lite", "pretty-hex", - "thiserror", + "thiserror 1.0.44", "tokio", "tokio-util", "tracing", @@ -5677,7 +5659,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -5725,6 +5707,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4bf6fecd69fcdede0ec680aaf474cdab988f9de6bc73d3758f0160e3b7025a" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.8" @@ -5775,7 +5771,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -5869,7 +5865,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" dependencies = [ "quote", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] @@ -5900,14 +5896,14 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.58", + "syn 2.0.96", ] [[package]] name = "tungstenite" -version = "0.24.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +checksum = "413083a99c579593656008130e29255e54dcaae495be556cc26888f211648c24" dependencies = [ "byteorder", "bytes", @@ -5918,7 +5914,7 @@ dependencies = [ "native-tls", "rand 0.8.5", "sha1", - "thiserror", + "thiserror 2.0.11", "utf-8", ] @@ -6223,7 +6219,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", "wasm-bindgen-shared", ] @@ -6257,7 +6253,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6620,26 +6616,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "ws_stream_tungstenite" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed39ff9f8b2eda91bf6390f9f49eee93d655489e15708e3bb638c1c4f07cecb4" -dependencies = [ - "async-tungstenite", - "async_io_stream", - "bitflags 2.4.0", - "futures-core", - "futures-io", - "futures-sink", - "futures-util", - "pharos", - "rustc_version", - "tokio", - "tracing", - "tungstenite", -] - [[package]] name = "wyz" version = "0.5.1" @@ -6672,5 +6648,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.96", ] diff --git a/quaint/Cargo.toml b/quaint/Cargo.toml index 0c388c44596..0b683ec8935 100644 --- a/quaint/Cargo.toml +++ b/quaint/Cargo.toml @@ -51,8 +51,7 @@ postgresql-native = [ "bit-vec", "lru-cache", "byteorder", - "dep:ws_stream_tungstenite", - "dep:async-tungstenite" + "dep:tokio-tungstenite", ] postgresql = [] @@ -84,6 +83,7 @@ hex = "0.4" itertools.workspace = true regex.workspace = true enumflags2.workspace = true +pin-project.workspace = true either = { version = "1.6" } base64 = { version = "0.12.3" } @@ -116,14 +116,9 @@ tracing-test = "0.2" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { workspace = true, features = ["js"] } -[dependencies.ws_stream_tungstenite] -version = "0.14.0" -features = ["tokio_io"] -optional = true - -[dependencies.async-tungstenite] -version = "0.28.0" -features = ["tokio-runtime", "tokio-native-tls"] +[dependencies.tokio-tungstenite] +version = "0.26.1" +features = ["native-tls"] optional = true [dependencies.byteorder] diff --git a/quaint/src/connector/postgres/native/websocket.rs b/quaint/src/connector/postgres/native/websocket.rs index f278c9f099b..29f1c3d8990 100644 --- a/quaint/src/connector/postgres/native/websocket.rs +++ b/quaint/src/connector/postgres/native/websocket.rs @@ -1,20 +1,32 @@ -use std::str::FromStr; +use std::{ + io::{Error as IoError, ErrorKind as IoErrorKind}, + pin::Pin, + str::FromStr, + task::{ready, Context, Poll}, +}; -use async_tungstenite::{ - tokio::connect_async, +use bytes::Bytes; +use futures::{FutureExt, Sink, SinkExt, Stream}; +use pin_project::pin_project; +use postgres_native_tls::TlsConnector; +use prisma_metrics::WithMetricsInstrumentation; +use tokio::{ + io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}, + net::TcpStream, +}; +use tokio_postgres::{Client, Config}; +use tokio_tungstenite::{ + connect_async, tungstenite::{ self, client::IntoClientRequest, http::{HeaderMap, HeaderValue, StatusCode}, - Error as TungsteniteError, + Error as TungsteniteError, Message, }, + MaybeTlsStream, WebSocketStream, }; -use futures::FutureExt; -use postgres_native_tls::TlsConnector; -use prisma_metrics::WithMetricsInstrumentation; -use tokio_postgres::{Client, Config}; +use tokio_util::io::StreamReader; use tracing_futures::WithSubscriber; -use ws_stream_tungstenite::WsStream; use crate::{ connector::PostgresWebSocketUrl, @@ -35,20 +47,22 @@ pub(crate) async fn connect_via_websocket(url: PostgresWebSocketUrl) -> crate::R if let Some(db_name) = db_name { config.dbname(&db_name); } - let ws_byte_stream = WsStream::new(ws_stream); + let ws_byte_stream = WsTunnel::new(ws_stream); let tls = TlsConnector::new(native_tls::TlsConnector::new()?, db_host); let (client, connection) = config.connect_raw(ws_byte_stream, tls).await?; + tokio::spawn( connection - .map(|r| { - if let Err(e) = r { - tracing::error!("Error in PostgreSQL WebSocket connection: {e:?}"); + .map(move |result| { + if let Err(err) = result { + tracing::error!("Error in PostgreSQL WebSocket connection: {err:?}"); } }) .with_current_subscriber() .with_current_recorder(), ); + Ok(client) } @@ -95,3 +109,94 @@ impl From for error::Error { builder.build() } } + +#[pin_project] +struct WsTunnel(#[pin] StreamReader); + +impl WsTunnel { + fn new(stream: WebSocketStream>) -> Self { + WsTunnel(StreamReader::new(WsBytesStream(stream))) + } +} + +#[pin_project] +struct WsBytesStream(#[pin] WebSocketStream>); + +impl WsBytesStream { + fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut WebSocketStream>> { + self.project().0 + } +} + +impl Stream for WsBytesStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project().0.poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Ok(res))) => { + if let Message::Binary(b) = res { + Poll::Ready(Some(Ok(b))) + } else { + Poll::Ready(Some(Err(IoError::new( + IoErrorKind::Other, + "TCP tunneling requires binary frames", + )))) + } + } + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(IoError::new(IoErrorKind::Other, err)))), + } + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl AsyncRead for WsTunnel { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + self.project().0.poll_read(cx, buf) + } +} + +impl AsyncBufRead for WsTunnel { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} + +impl AsyncWrite for WsTunnel { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let stream = &mut self.get_mut().0.get_mut().0; + ready!(stream + .poll_ready_unpin(cx) + .map_err(|err| IoError::new(IoErrorKind::Other, err)))?; + match stream.start_send_unpin(Message::Binary(Bytes::copy_from_slice(buf))) { + Ok(()) => Poll::Ready(Ok(buf.len())), + Err(e) => Poll::Ready(Err(IoError::new(IoErrorKind::Other, e))), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .0 + .get_pin_mut() + .get_pin_mut() + .poll_flush(cx) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .0 + .get_pin_mut() + .get_pin_mut() + .poll_close(cx) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + } +}