advance-practice/bridging-with-sync #900
Replies: 2 comments 2 replies
-
我才意识到运行时是runtime... |
Beta Was this translation helpful? Give feedback.
0 replies
-
发现个奇怪的问题, 用block请求chunk的http, 中间就断了。 如果用单纯的异步是可以的, 谁知道怎么回事吗? use std::sync::mpsc::{self, Sender};
use std::thread;
use futures_util::{StreamExt, Stream, TryStreamExt};
use http::{request::Builder, Method};
use hyper::body::HttpBody;
use hyper::{Body};
use hyper::client::Client;
use serde_derive::{Serialize, Deserialize};
use http::header::CONTENT_TYPE;
use futures_util::future::FutureExt;
use tokio::sync::mpsc as tokio_mpsc;
/**
cargo.toml文件:
[package]
name = "stream-test"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
hyper = { version = "0.14", features = [ "full" ]}
tokio = { version = "1", features = ["full"] }
http = "0.2"
serde = "1.0"
serde_json = "1.0"
base64 = "0.13"
serde_derive = "1.0"
futures-util = "0.3"
*/
fn main() {
// println!("{:?}", response.body());
// let (tx, mut rx) = tokio_mpsc::channel::<String>(1000);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let handle = thread::spawn(move || {
rt.block_on(async {
let client = Client::new();
let req_builder = Builder::new().method(Method::POST).header("X-Registry-Auth", base64_url_encode(&get_payload()));
let req = req_builder
.uri("http://127.0.0.1:2375/v1.40/images/create?fromImage=ibmcom%2Fhelloworld&fromSrc=&repo=&tag=latest&platform=")
.header(CONTENT_TYPE, "application/json")
.body(Body::empty()).unwrap();
let mut response = client.request(req).await.unwrap();
// let mut response = client.request(req).await.unwrap();
let body = response.body_mut();
let mut data = body.data().into_stream();
while let Some(data) = data.next().await {
println!("aaaaaaaaaaaaaaaaaaaaaaa: {:?}", data);
if let Some(Ok(d)) = data {
let string = String::from_utf8(d.into_iter().collect::<Vec<u8>>()).unwrap_or_default();
println!("{}", string);
// tx.send(string);
} else {
println!("data_stream next value {:?}", data);
}
}
});
});
// for x in tokio::runtime::Builder::new_current_thread().enable_io().build().unwrap().block_on(rx.recv()) {
// println!("x: {}", x);
// }
handle.join().unwrap();
} |
Beta Was this translation helpful? Give feedback.
2 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
advance-practice/bridging-with-sync
https://course.rs/advance-practice/bridging-with-sync.html
Beta Was this translation helpful? Give feedback.
All reactions