Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xitca-web] add bench for async orm #9287

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 251 additions & 40 deletions frameworks/Rust/xitca-web/Cargo.lock

Large diffs are not rendered by default.

33 changes: 25 additions & 8 deletions frameworks/Rust/xitca-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,35 @@ name = "xitca-web-axum"
path = "./src/main_axum.rs"
required-features = ["axum", "io-uring", "perf", "pg-sync", "template"]

[[bin]]
name = "xitca-web-orm"
path = "./src/main_orm.rs"
required-features = ["pg-orm-async", "template", "web-codegen"]

[[bin]]
name = "xitca-web-sync"
path = "./src/main_sync.rs"
required-features = ["pg-orm", "template", "web-codegen"]

[features]
# pg optional
# pg client optional
pg = ["dep:xitca-postgres"]
# pg send/sync optional
# pg client send/sync optional
pg-sync = ["dep:xitca-postgres"]
# pg orm optional
pg-orm = ["dep:diesel"]
# diesel orm optional
pg-orm = ["diesel/r2d2"]
# diesel async orm optional
pg-orm-async = ["dep:diesel", "dep:diesel-async", "dep:xitca-postgres-diesel", "futures-util"]
# http router optional
router = ["xitca-http/router"]
# web optional
web = ["dep:xitca-web"]
# web codegen optional
# web with macros optional
web-codegen = ["xitca-web/codegen", "xitca-web/urlencoded"]
# template optional
template = ["dep:sailfish"]
# io-uring optional
io-uring = ["xitca-http/io-uring", "xitca-server/io-uring"]
io-uring = ["dep:tokio-uring", "xitca-http/io-uring", "xitca-server/io-uring"]
# axum optional
axum = ["dep:axum", "dep:http-body", "dep:tower", "dep:tower-http", "xitca-web/tower-http-compat" ]
# unrealistic performance optimization
Expand All @@ -68,7 +75,12 @@ xitca-web = { version = "0.6", features = ["json"], optional = true }
xitca-postgres = { version = "0.1", optional = true }

# orm optional
diesel = { version = "2", features = ["postgres", "r2d2"], optional = true }
diesel = { version = "2", features = ["postgres"], optional = true }

# orm async optional
diesel-async = { version = "0.5", features = ["bb8", "postgres"], optional = true }
xitca-postgres-diesel = { version = "0.1", optional = true }
futures-util = { version = "0.3", default-features = false, optional = true }

# template optional
sailfish = { version = "0.9", default-features = false, features = ["derive", "perf-inline"], optional = true }
Expand All @@ -79,6 +91,9 @@ http-body = { version = "1", optional = true }
tower = { version = "0.4", optional = true }
tower-http = { version = "0.5", features = ["set-header"], optional = true }

# io-uring optional
tokio-uring = { version = "0.5", optional = true }

# perf optional
mimalloc = { version = "0.1", default-features = false, optional = true }

Expand All @@ -95,5 +110,7 @@ codegen-units = 1
panic = "abort"

[patch.crates-io]
xitca-postgres = { git = "https://github.com/HFQR/xitca-web.git", rev = "0cda225" }
xitca-postgres-diesel = { git = "https://github.com/fakeshadow/xitca-postgres-diesel", rev = "8addeca" }

diesel-async = { git = "https://github.com/weiznich/diesel_async", rev = "5b8262b" }
mio = { git = "https://github.com/fakeshadow/mio", rev = "9bae6012b7ecfc6083350785f71a5e8265358178" }
22 changes: 22 additions & 0 deletions frameworks/Rust/xitca-web/benchmark_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@
"notes": "",
"versus": ""
},
"orm": {
"json_url": "/json",
"plaintext_url": "/plaintext",
"db_url": "/db",
"fortune_url": "/fortunes",
"query_url": "/queries?q=",
"update_url": "/updates?q=",
"port": 8080,
"approach": "realistic",
"classification": "fullstack",
"database": "postgres",
"framework": "xitca-web [orm]",
fakeshadow marked this conversation as resolved.
Show resolved Hide resolved
"language": "rust",
"orm": "full",
"platform": "none",
"webserver": "xitca-server",
"os": "linux",
"database_os": "linux",
"display_name": "xitca-web [orm]",
"notes": "",
"versus": ""
},
"sync": {
"json_url": "/json",
"plaintext_url": "/plaintext",
Expand Down
24 changes: 11 additions & 13 deletions frameworks/Rust/xitca-web/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(clippy::unnecessary_lazy_evaluations)]

use xitca_io::bytes::BytesMut;
use xitca_postgres::{pipeline::Pipeline, pool::Pool, AsyncLendingIterator, Type};
use xitca_postgres::{pipeline::Pipeline, pool::Pool, types::Type, AsyncLendingIterator};

use super::{
ser::{Fortune, Fortunes, World},
Expand All @@ -21,11 +21,9 @@ pub struct Client {
type Shared = (Rand, BytesMut);

const FORTUNE_SQL: &str = "SELECT * FROM fortune";

const FORTUNE_SQL_TYPES: &[Type] = &[];

const WORLD_SQL: &str = "SELECT * FROM world WHERE id=$1";

const WORLD_SQL_TYPES: &[Type] = &[Type::INT4];

fn update_query(num: usize) -> Box<str> {
Expand Down Expand Up @@ -71,18 +69,18 @@ impl Client {

pub async fn get_world(&self) -> HandleResult<World> {
let mut conn = self.pool.get().await?;
let stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
let stmt = conn.prepare_cache(WORLD_SQL, WORLD_SQL_TYPES).await?;
let id = self.shared().0.gen_id();
let mut res = conn.consume().query_raw(&stmt, [id])?;
let row = res.try_next().await?.ok_or_else(|| "World does not exist")?;
Ok(World::new(row.get_raw(0), row.get_raw(1)))
Ok(World::new(row.get(0), row.get(1)))
}

pub async fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {
let len = num as usize;

let mut conn = self.pool.get().await?;
let stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
let stmt = conn.prepare_cache(WORLD_SQL, WORLD_SQL_TYPES).await?;

let mut res = {
let (ref mut rng, ref mut buf) = *self.shared();
Expand All @@ -95,7 +93,7 @@ impl Client {

while let Some(mut item) = res.try_next().await? {
while let Some(row) = item.try_next().await? {
worlds.push(World::new(row.get_raw(0), row.get_raw(1)))
worlds.push(World::new(row.get(0), row.get(1)))
}
}

Expand All @@ -108,8 +106,8 @@ impl Client {
let update = self.updates.get(len).ok_or_else(|| "num out of bound")?;

let mut conn = self.pool.get().await?;
let world_stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
let update_stmt = conn.prepare(update, &[]).await?;
let world_stmt = conn.prepare_cache(WORLD_SQL, WORLD_SQL_TYPES).await?;
let update_stmt = conn.prepare_cache(update, &[]).await?;

let mut params = Vec::with_capacity(len);

Expand All @@ -133,7 +131,7 @@ impl Client {
while let Some(mut item) = res.try_next().await? {
while let Some(row) = item.try_next().await? {
let r_id = r_ids.next().unwrap()[1];
worlds.push(World::new(row.get_raw(0), r_id))
worlds.push(World::new(row.get(0), r_id))
}
}

Expand All @@ -145,11 +143,11 @@ impl Client {
items.push(Fortune::new(0, "Additional fortune added at request time."));

let mut conn = self.pool.get().await?;
let stmt = conn.prepare(FORTUNE_SQL, FORTUNE_SQL_TYPES).await?;
let mut res = conn.consume().query_raw::<[i32; 0]>(&stmt, [])?;
let stmt = conn.prepare_cache(FORTUNE_SQL, FORTUNE_SQL_TYPES).await?;
let mut res = conn.consume().query_raw::<_, [i32; 0]>(&stmt, [])?;

while let Some(row) = res.try_next().await? {
items.push(Fortune::new(row.get_raw(0), row.get_raw::<String>(1)));
items.push(Fortune::new(row.get(0), row.get::<String>(1)));
}

items.sort_by(|it, next| it.message.cmp(&next.message));
Expand Down
153 changes: 153 additions & 0 deletions frameworks/Rust/xitca-web/src/db_diesel_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::{
io,
sync::{Arc, Mutex},
};

use diesel::prelude::*;
use diesel_async::{
pooled_connection::{bb8, AsyncDieselConnectionManager},
RunQueryDsl,
};
use futures_util::{
future::join,
stream::{FuturesUnordered, TryStreamExt},
};
use xitca_postgres_diesel::AsyncPgConnection;

use crate::{
ser::{Fortune, Fortunes, World},
util::{bulk_update_gen, Error, HandleResult, Rand, DB_URL},
};

pub type Pool = Arc<_Pool>;

pub struct _Pool {
pool: bb8::Pool<AsyncPgConnection>,
rng: Mutex<Rand>,
}

pub async fn create() -> io::Result<Arc<_Pool>> {
bb8::Pool::builder()
.max_size(1)
.min_idle(Some(1))
.test_on_check_out(false)
.build(AsyncDieselConnectionManager::new(DB_URL))
.await
.map_err(io::Error::other)
.map(|pool| {
Arc::new(_Pool {
pool,
rng: Mutex::new(Rand::default()),
})
})
}

#[cold]
#[inline(never)]
fn not_found() -> Error {
"world not found".into()
}

impl _Pool {
pub async fn get_world(&self) -> HandleResult<World> {
use crate::schema::world::dsl::*;
{
let w_id = self.rng.lock().unwrap().gen_id();
let mut conn = self.pool.get().await?;
world.filter(id.eq(w_id)).load(&mut conn)
}
.await?
.pop()
.ok_or_else(not_found)
}

pub async fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {
use crate::schema::world::dsl::*;
{
let mut conn = self.pool.get().await?;
let mut rng = self.rng.lock().unwrap();
(0..num)
.map(|_| {
let w_id = rng.gen_id();
let fut = world.filter(id.eq(w_id)).load::<World>(&mut conn);
async { fut.await?.pop().ok_or_else(not_found) }
})
.collect::<FuturesUnordered<_>>()
}
.try_collect()
.await
}

pub async fn update(&self, num: u16) -> HandleResult<Vec<World>> {
use crate::schema::world::dsl::*;

let mut rngs = Vec::with_capacity(num as _);

let (select_res, update_res) = {
let mut conn = self.pool.get().await?;

let mut rng = self.rng.lock().unwrap();

let select = (0..num)
.map(|_| {
let w_id = rng.gen_id();
let num = rng.gen_id();

rngs.push((w_id, num));

let fut = world.filter(id.eq(w_id)).load::<World>(&mut conn);

async move {
fut.await?
.pop()
.map(|mut w| {
w.randomnumber = num;
w
})
.ok_or_else(not_found)
}
})
.collect::<FuturesUnordered<_>>();

rngs.sort_by(|(a, _), (b, _)| a.cmp(b));

let update = diesel::sql_query(update_query(&rngs)).execute(&mut conn);

join(select.try_collect::<Vec<_>>(), update)
}
.await;

update_res?;
let mut worlds = select_res?;

worlds.sort_by_key(|w| w.id);

Ok(worlds)
}

pub async fn tell_fortune(&self) -> HandleResult<Fortunes> {
use crate::schema::fortune::dsl::*;

let mut items = {
let mut conn = self.pool.get().await?;
fortune.load::<Fortune>(&mut conn)
}
.await?;

items.push(Fortune::new(0, "Additional fortune added at request time."));
items.sort_by(|it, next| it.message.cmp(&next.message));

Ok(Fortunes::new(items))
}
}

// diesel does not support high level bulk update api. use raw sql to bypass the limitation.
// relate discussion: https://github.com/diesel-rs/diesel/discussions/2879
fn update_query(ids: &[(i32, i32)]) -> String {
bulk_update_gen(|query| {
use std::fmt::Write;
ids.iter().for_each(|(w_id, num)| {
write!(query, "({}::int,{}::int),", w_id, num).unwrap();
});
})
}
Loading
Loading