-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker_pool.rs
79 lines (65 loc) · 1.77 KB
/
worker_pool.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use futures_util::future::join_all;
use hebi::prelude::*;
#[tokio::main]
async fn main() {
let client = reqwest::Client::new();
let module = NativeModule::builder("http")
.async_function("get", move |scope| get(scope, client.clone()))
.finish();
const CONCURRENCY: usize = 16;
let pool = WorkerPool::new(&[module], CONCURRENCY);
let mut handles = vec![];
for index in 0..CONCURRENCY {
let pool = pool.clone();
let source = format!(
r#"
import http
http.get("https://jsonplaceholder.typicode.com/todos/{index}")
"#
);
let task = tokio::spawn(async move {
let mut hebi = pool.get().await;
let response = hebi.eval_async(&source).await.unwrap();
println!("{response}");
pool.put(hebi);
});
handles.push(task);
}
join_all(handles).await;
}
#[derive(Clone)]
struct WorkerPool {
tx: flume::Sender<Hebi>,
rx: flume::Receiver<Hebi>,
}
impl WorkerPool {
pub fn new(modules: &[NativeModule], capacity: usize) -> Self {
let (tx, rx) = flume::bounded(capacity);
for _ in 0..capacity {
let mut worker = Hebi::new();
for module in modules {
worker.register(module);
}
tx.send(worker).unwrap();
}
Self { tx, rx }
}
pub async fn get(&self) -> Hebi {
self.rx.recv_async().await.unwrap()
}
pub fn put(&self, worker: Hebi) {
self.tx.send(worker).unwrap()
}
}
async fn get(scope: Scope<'_>, client: reqwest::Client) -> hebi::Result<Str<'_>> {
let url = scope.param::<Str>(0)?;
let response = client
.get(url.as_str())
.send()
.await
.map_err(hebi::Error::user)?;
let bytes = response.bytes().await.map_err(hebi::Error::user)?;
let data = bytes.to_vec();
let str = String::from_utf8(data).map_err(hebi::Error::user)?;
Ok(scope.new_string(str))
}