Skip to content

Commit

Permalink
Fix: reduce async runtime blocking (#1336)
Browse files Browse the repository at this point in the history
This PR aims to optimize request latency by reducing async runtime
blocking

* all functions of graphql objects are blocking, and many trigger file
i/o. However, graphql requests were being executed via an async
function. Use sync version of graphql execution and use
tokio::spawn_blocking
* use tokio::spawn_blocking for operations involving file i/o
* reduce sync mutex lock regions where possible

### Performance

* std::sync::Mutex should be very efficient when there's no contention,
and none is expected in graphql routines
* I've benchmarked the overhead of tokio::spawn_blocking, since it's
used more now: on my computer (core i5 macbook) I can spawn and .await
around 90k tasks per second, which is at least two orders of magnitude
away from the number of requests Josh can currently handle per second.

commit-id:bd8098e1
  • Loading branch information
vlad-ivanov-name authored Jun 13, 2024
1 parent 7e755c0 commit 3768e10
Showing 1 changed file with 162 additions and 87 deletions.
249 changes: 162 additions & 87 deletions josh-proxy/src/bin/josh-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use josh::{josh_error, JoshError, JoshResult};
use josh_rpc::calls::RequestedCommand;
use serde::Serialize;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::io;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -103,17 +102,22 @@ impl std::fmt::Debug for JoshProxyService {
}
}

fn fetch_needed(
async fn fetch_needed(
service: Arc<JoshProxyService>,
remote_url: &String,
upstream_repo: &String,
remote_url: &str,
upstream_repo: &str,
force: bool,
head_ref: Option<&str>,
head_ref_resolved: Option<&str>,
) -> Result<bool, FetchError> {
let fetch_timer_ok = {
if let Some(last) = service.fetch_timers.read()?.get(remote_url) {
let since = std::time::Instant::now().duration_since(*last);
let last = {
let fetch_timers = service.fetch_timers.read()?;
fetch_timers.get(remote_url).cloned()
};

if let Some(last) = last {
let since = std::time::Instant::now().duration_since(last);
let max = std::time::Duration::from_secs(ARGS.cache_duration);

tracing::trace!("last: {:?}, since: {:?}, max: {:?}", last, since, max);
Expand All @@ -123,34 +127,46 @@ fn fetch_needed(
}
};

let resolve_cache_ref = |cache_ref: &str| -> JoshResult<Option<git2::Oid>> {
let transaction = josh::cache::Transaction::open(
&service.repo_path.join("mirror"),
Some(&format!(
"refs/josh/upstream/{}/",
&josh::to_ns(&upstream_repo),
)),
)?;
let resolve_cache_ref = |cache_ref: &str| {
let cache_ref = cache_ref.to_string();
let upstream_repo = upstream_repo.to_string();

match transaction
.repo()
.refname_to_id(&transaction.refname(cache_ref))
{
Ok(oid) => Ok(Some(oid)),
Err(_) => Ok(None),
}
tokio::task::spawn_blocking(move || {
let transaction = josh::cache::Transaction::open(
&service.repo_path.join("mirror"),
Some(&format!(
"refs/josh/upstream/{}/",
&josh::to_ns(&upstream_repo),
)),
)?;

match transaction
.repo()
.refname_to_id(&transaction.refname(&cache_ref))
{
Ok(oid) => Ok(Some(oid)),
Err(_) => Ok(None),
}
})
};

match (force, fetch_timer_ok, head_ref, head_ref_resolved) {
(false, true, None, _) => return Ok(false),
(false, true, Some(head_ref), _) => {
if (resolve_cache_ref(head_ref).map_err(FetchError::from_josh_error)?).is_some() {
if (resolve_cache_ref(head_ref)
.await?
.map_err(FetchError::from_josh_error)?)
.is_some()
{
trace!("cache ref resolved");
return Ok(false);
}
}
(false, false, Some(head_ref), Some(head_ref_resolved)) => {
if let Some(oid) = resolve_cache_ref(head_ref).map_err(FetchError::from_josh_error)? {
if let Some(oid) = resolve_cache_ref(head_ref)
.await?
.map_err(FetchError::from_josh_error)?
{
if oid.to_string() == head_ref_resolved {
trace!("cache ref resolved and matches");
return Ok(false);
Expand Down Expand Up @@ -199,15 +215,16 @@ async fn fetch_upstream(
force,
head_ref,
head_ref_resolved,
)? {
)
.await?
{
return Ok(());
}

let us = upstream_repo.clone();
let semaphore = service
.fetch_permits
.lock()?
.entry(us.clone())
.entry(upstream_repo.clone())
.or_insert(Arc::new(tokio::sync::Semaphore::new(1)))
.clone();
let permit = semaphore.acquire().await;
Expand All @@ -222,36 +239,52 @@ async fn fetch_upstream(
force,
head_ref,
head_ref_resolved,
)? {
)
.await?
{
return Ok(());
}

let fetch_result = {
let span = tracing::span!(tracing::Level::INFO, "fetch_refs_from_url");

let mirror_path = service.repo_path.join("mirror");
let upstream_repo = upstream_repo.clone();
let remote_url = remote_url.clone();
let remote_auth = remote_auth.clone();

tokio::task::spawn_blocking(move || {
let _span_guard = span.enter();
josh_proxy::fetch_refs_from_url(
&mirror_path,
&upstream_repo,
&remote_url,
&refs_to_fetch,
&remote_auth,
)
})
.await?
};

let hres = {
let span = tracing::span!(tracing::Level::INFO, "get_head");

let mirror_path = service.repo_path.join("mirror");
let remote_url = remote_url.clone();
let remote_auth = remote_auth.clone();

tokio::task::spawn_blocking(move || {
let _span_guard = span.enter();
josh_proxy::get_head(&mirror_path, &remote_url, &remote_auth)
})
.await?
};

let fetch_timers = service.fetch_timers.clone();
let heads_map = service.heads_map.clone();
let br_path = service.repo_path.join("mirror");

let span = tracing::span!(tracing::Level::INFO, "fetch_refs_from_url");
let ru = remote_url.clone();
let task_remote_auth = remote_auth.clone();
let fetch_result = tokio::task::spawn_blocking(move || {
let _span_guard = span.enter();
josh_proxy::fetch_refs_from_url(&br_path, &us, &ru, &refs_to_fetch, &task_remote_auth)
})
.await?;

let us = upstream_repo.clone();
let s = tracing::span!(tracing::Level::INFO, "get_head");
let br_path = service.repo_path.join("mirror");
let ru = remote_url.clone();
let task_remote_auth = remote_auth.clone();
let hres = tokio::task::spawn_blocking(move || {
let _e = s.enter();
josh_proxy::get_head(&br_path, &ru, &task_remote_auth)
})
.await?;

if let Ok(hres) = hres {
heads_map.write()?.insert(us, hres);
heads_map.write()?.insert(upstream_repo.clone(), hres);
}

std::mem::drop(permit);
Expand Down Expand Up @@ -1742,6 +1775,7 @@ fn update_hook(refname: &str, old: &str, new: &str) -> josh::JoshResult<i32> {
}
}

#[tracing::instrument(skip_all)]
async fn serve_graphql(
serv: Arc<JoshProxyService>,
req: Request<hyper::Body>,
Expand All @@ -1750,46 +1784,91 @@ async fn serve_graphql(
auth: josh_proxy::auth::Handle,
) -> josh::JoshResult<Response<hyper::Body>> {
let remote_url = upstream.clone() + upstream_repo.as_str();
let parsed = match josh_proxy::juniper_hyper::parse_req(req).await {
Ok(r) => r,
let parsed_request = match josh_proxy::juniper_hyper::parse_req(req).await {
Ok(parsed_request) => {
// Even though there's a mutex, it's just to manage access
// between sync and async code, so no contention is expected
Arc::new(std::sync::Mutex::new(parsed_request))
}
Err(resp) => return Ok(resp),
};

let transaction_mirror = josh::cache::Transaction::open(
&serv.repo_path.join("mirror"),
Some(&format!(
"refs/josh/upstream/{}/",
&josh::to_ns(&upstream_repo),
)),
)?;
let transaction = josh::cache::Transaction::open(&serv.repo_path.join("overlay"), None)?;
transaction.repo().odb()?.add_disk_alternate(
serv.repo_path
.join("mirror")
.join("objects")
.to_str()
.unwrap(),
)?;
let context = std::sync::Arc::new(josh::graphql::context(transaction, transaction_mirror));
let root_node = std::sync::Arc::new(josh::graphql::repo_schema(
let context = {
let upstream_repo = upstream_repo.clone();
let serv = serv.clone();

tokio::task::spawn_blocking(move || -> josh::JoshResult<_> {
let transaction_mirror = josh::cache::Transaction::open(
&serv.repo_path.join("mirror"),
Some(&format!(
"refs/josh/upstream/{}/",
&josh::to_ns(&upstream_repo),
)),
)?;

let transaction =
josh::cache::Transaction::open(&serv.repo_path.join("overlay"), None)?;
transaction.repo().odb()?.add_disk_alternate(
&serv
.repo_path
.join("mirror")
.join("objects")
.display()
.to_string(),
)?;

Ok(Arc::new(josh::graphql::context(
transaction,
transaction_mirror,
)))
})
.await??
};

let root_node = Arc::new(josh::graphql::repo_schema(
upstream_repo
.strip_suffix(".git")
.unwrap_or(&upstream_repo)
.to_string(),
false,
));

let run_request = |span: tracing::Span| {
let context = context.clone();
let parsed_request = parsed_request.clone();
let root_node = root_node.clone();

tokio::task::spawn_blocking(move || {
let _span_guard = span.enter();

let parsed_request = parsed_request.lock().unwrap();
let result = parsed_request.execute_sync(&root_node, &context);

let response_code = if result.is_ok() {
StatusCode::OK
} else {
StatusCode::BAD_REQUEST
};

let response_json = serde_json::to_string_pretty(&result)
.expect("bug: failed to serialize GraphQL response");

(response_code, response_json)
})
};

let remote_auth = RemoteAuth::Http { auth };
let res = {
let (response_code, response_json) = {
// First attempt to serve GraphQL query. If we can serve it
// that means all requested revisions were specified by SHA and we could find
// all of them locally, so no need to fetch.
let res = parsed.execute(&root_node, &context).await;
let execute_span = tracing::info_span!("execute_1");
let (response_code, response_json) = run_request(execute_span).await?;

// The "allow_refs" flag will be set by the query handler if we need to do a fetch
// to complete the query.
if !*context.allow_refs.lock().unwrap() {
res
(response_code, response_json)
} else {
match fetch_upstream(
serv.clone(),
Expand Down Expand Up @@ -1820,26 +1899,21 @@ async fn serve_graphql(
}
};

parsed.execute(&root_node, &context).await
let execute_span = tracing::info_span!("execute_2");
run_request(execute_span).await?
}
};

let code = if res.is_ok() {
hyper::StatusCode::OK
} else {
hyper::StatusCode::BAD_REQUEST
let response = {
let mut response = Response::new(hyper::Body::from(response_json));
*response.status_mut() = response_code;
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static("application/json"),
);
response
};

let body = hyper::Body::from(serde_json::to_string_pretty(&res).unwrap());
let mut resp = Response::new(hyper::Body::empty());
*resp.status_mut() = code;
resp.headers_mut().insert(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static("application/json"),
);
*resp.body_mut() = body;
let gql_result = resp;

tokio::task::spawn_blocking(move || -> josh::JoshResult<_> {
let temp_ns = Arc::new(josh_proxy::TmpGitNamespace::new(
&serv.repo_path.join("overlay"),
Expand Down Expand Up @@ -1884,7 +1958,8 @@ async fn serve_graphql(
})
.in_current_span()
.await??;
Ok(gql_result)

Ok(response)
}

async fn shutdown_signal() {
Expand Down

0 comments on commit 3768e10

Please sign in to comment.