diff --git a/Cargo.lock b/Cargo.lock index 4b93c7f5..4f185ac8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -122,6 +122,51 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -1712,6 +1757,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1829,6 +1886,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.11.0" @@ -1858,7 +1924,7 @@ dependencies = [ "handlebars", "hex", "indoc", - "itertools", + "itertools 0.11.0", "juniper", "lazy_static", "log", @@ -1915,6 +1981,8 @@ dependencies = [ "lazy_static", "opentelemetry", "opentelemetry-jaeger", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions 0.14.0", "percent-encoding", "regex", "reqwest", @@ -2086,6 +2154,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.1" @@ -2287,10 +2361,41 @@ dependencies = [ "futures-core", "futures-util", "opentelemetry", - "opentelemetry-semantic-conventions", + "opentelemetry-semantic-conventions 0.12.0", "thrift", ] +[[package]] +name = "opentelemetry-otlp" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry-proto", + "opentelemetry-semantic-conventions 0.12.0", + "opentelemetry_api", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", + "prost", + "tonic", +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.12.0" @@ -2300,6 +2405,12 @@ dependencies = [ "opentelemetry", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" + [[package]] name = "opentelemetry_api" version = "0.20.0" @@ -2333,6 +2444,7 @@ dependencies = [ "percent-encoding", "rand 0.8.5", "regex", + "serde_json", "thiserror", ] @@ -2552,6 +2664,29 @@ dependencies = [ "human_format", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "quote" version = "1.0.35" @@ -2790,6 +2925,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "ryu" version = "1.0.17" @@ -3233,6 +3374,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -3254,6 +3405,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -3303,6 +3465,60 @@ dependencies = [ "winnow 0.6.2", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" diff --git a/josh-core/src/lib.rs b/josh-core/src/lib.rs index 71d86a5b..71feb111 100644 --- a/josh-core/src/lib.rs +++ b/josh-core/src/lib.rs @@ -229,7 +229,7 @@ pub fn get_change_id(commit: &git2::Commit, sha: git2::Oid) -> Change { change } -#[tracing::instrument(skip(transaction))] +#[tracing::instrument(level = tracing::Level::TRACE, skip(transaction))] pub fn filter_commit( transaction: &cache::Transaction, filterobj: filter::Filter, diff --git a/josh-proxy/Cargo.toml b/josh-proxy/Cargo.toml index 5dd0c44d..37bf8e86 100644 --- a/josh-proxy/Cargo.toml +++ b/josh-proxy/Cargo.toml @@ -44,3 +44,5 @@ tempdir = "0.3.7" gix = { workspace = true } juniper = { workspace = true } git2 = { workspace = true } +opentelemetry-semantic-conventions = "0.14.0" +opentelemetry-otlp = "0.13.0" diff --git a/josh-proxy/src/bin/josh-proxy.rs b/josh-proxy/src/bin/josh-proxy.rs index b90d5aa8..cac2cb49 100644 --- a/josh-proxy/src/bin/josh-proxy.rs +++ b/josh-proxy/src/bin/josh-proxy.rs @@ -170,7 +170,7 @@ fn fetch_needed( return Ok(true); } -#[tracing::instrument] +#[tracing::instrument(skip(service))] async fn fetch_upstream( service: Arc, upstream_repo: String, @@ -237,7 +237,7 @@ async fn fetch_upstream( let heads_map = service.heads_map.clone(); let br_path = service.repo_path.join("mirror"); - let span = tracing::span!(tracing::Level::TRACE, "fetch worker"); + let span = tracing::span!(tracing::Level::INFO, "fetch_refs_from_url"); let ru = remote_url.clone(); let task_remote_auth = remote_auth.clone(); let fetch_result = tokio::task::spawn_blocking(move || { @@ -247,7 +247,7 @@ async fn fetch_upstream( .await?; let us = upstream_repo.clone(); - let s = tracing::span!(tracing::Level::TRACE, "get_head worker"); + let s = tracing::span!(tracing::Level::INFO, "get_head"); let br_path = service.repo_path.join("mirror"); let ru = remote_url.clone(); let task_remote_auth = remote_auth.clone(); @@ -348,11 +348,7 @@ async fn static_paths( Ok(None) } -#[tracing::instrument] -async fn repo_update_fn( - serv: Arc, - req: Request, -) -> josh::JoshResult> { +async fn repo_update_fn(req: Request) -> josh::JoshResult> { let body = hyper::body::to_bytes(req.into_body()).await; let s = tracing::span!(tracing::Level::TRACE, "repo update worker"); @@ -382,7 +378,7 @@ async fn repo_update_fn( }?) } -#[tracing::instrument] +#[tracing::instrument(skip(service))] async fn do_filter( repo_path: std::path::PathBuf, service: Arc, @@ -394,7 +390,7 @@ async fn do_filter( let permit = service.filter_permits.acquire().await; let heads_map = service.heads_map.clone(); - let tracing_span = tracing::span!(tracing::Level::TRACE, "do_filter worker"); + let tracing_span = tracing::span!(tracing::Level::INFO, "do_filter worker"); let head_ref = head_ref.clone(); tokio::task::spawn_blocking(move || { @@ -1111,7 +1107,7 @@ async fn handle_serve_namespace_request( } } -#[tracing::instrument] +#[tracing::instrument(skip(serv))] async fn call_service( serv: Arc, req_auth: (josh_proxy::auth::Handle, Request), @@ -1136,7 +1132,7 @@ async fn call_service( // When exposed to internet, should be blocked if path == "/repo_update" { - return repo_update_fn(serv, req).await; + return repo_update_fn(req).await; } if path == "/serve_namespace" { @@ -1273,7 +1269,6 @@ async fn call_service( None, false, ) - .in_current_span() .await { Ok(_) => {} @@ -1299,9 +1294,7 @@ async fn call_service( return serve_query(serv, q, meta.config.repo, filter, headref.get()).await; } - let temp_ns = prepare_namespace(serv.clone(), &meta, filter, &headref) - .in_current_span() - .await?; + let temp_ns = prepare_namespace(serv.clone(), &meta, filter, &headref).await?; let repo_path = serv .repo_path @@ -1317,14 +1310,18 @@ async fn call_service( .ok_or(josh::josh_error("repo_path.to_str"))? .to_string(); - let span = tracing::span!(tracing::Level::TRACE, "hyper_cgi"); - let _enter = span.enter(); - let mut context_propagator = HashMap::::default(); - let context = span.context(); - global::get_text_map_propagator(|propagator| { - propagator.inject_context(&context, &mut context_propagator); - }); - tracing::warn!("debug propagator: {:?}", context_propagator); + let context_propagator = { + let span = tracing::Span::current(); + + let mut context_propagator = HashMap::::default(); + let context = span.context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut context_propagator); + }); + + tracing::debug!("context propagator: {:?}", context_propagator); + context_propagator + }; let repo_update = josh_proxy::RepoUpdate { refs: HashMap::new(), @@ -1339,37 +1336,41 @@ async fn call_service( context_propagator, }; - let mut cmd = Command::new("git"); - cmd.arg("http-backend"); - cmd.current_dir(&serv.repo_path.join("overlay")); - cmd.env("GIT_DIR", &repo_path); - cmd.env("GIT_HTTP_EXPORT_ALL", ""); - cmd.env( - "GIT_ALTERNATE_OBJECT_DIRECTORIES", - serv.repo_path - .join("mirror") - .join("objects") - .to_str() - .ok_or(josh::josh_error("repo_path.to_str"))?, - ); - cmd.env("GIT_NAMESPACE", temp_ns.name()); - cmd.env("GIT_PROJECT_ROOT", repo_path); - cmd.env("JOSH_REPO_UPDATE", serde_json::to_string(&repo_update)?); - cmd.env("PATH_INFO", parsed_url.pathinfo.clone()); + let cgi_response = async { + let mut cmd = Command::new("git"); + cmd.arg("http-backend"); + cmd.current_dir(&serv.repo_path.join("overlay")); + cmd.env("GIT_DIR", &repo_path); + cmd.env("GIT_HTTP_EXPORT_ALL", ""); + cmd.env( + "GIT_ALTERNATE_OBJECT_DIRECTORIES", + serv.repo_path + .join("mirror") + .join("objects") + .to_str() + .ok_or(josh::josh_error("repo_path.to_str"))?, + ); + cmd.env("GIT_NAMESPACE", temp_ns.name()); + cmd.env("GIT_PROJECT_ROOT", repo_path); + cmd.env("JOSH_REPO_UPDATE", serde_json::to_string(&repo_update)?); + cmd.env("PATH_INFO", parsed_url.pathinfo.clone()); - let git_span = tracing::span!(tracing::Level::TRACE, "git http backend"); - let cgires = hyper_cgi::do_cgi(req, cmd).instrument(git_span).await; + let (response, stderr) = hyper_cgi::do_cgi(req, cmd).await; + tracing::debug!("Git stderr: {}", String::from_utf8_lossy(&stderr)); - tracing::debug!( - "Git stderr: {}", - String::from_utf8(cgires.1).unwrap_or("".to_string()) - ); + Ok::<_, JoshError>(response) + } + .instrument(tracing::span!( + tracing::Level::INFO, + "hyper_cgi / git-http-backend" + )) + .await?; // This is chained as a seperate future to make sure that // it is executed in all cases. std::mem::drop(temp_ns); - Ok(cgires.0) + Ok(cgi_response) } async fn serve_query( @@ -1433,7 +1434,7 @@ async fn serve_query( }) } -#[tracing::instrument] +#[tracing::instrument(skip(serv))] async fn prepare_namespace( serv: Arc, meta: &josh_proxy::MetaConfig, @@ -1460,18 +1461,20 @@ async fn prepare_namespace( Ok(temp_ns) } -fn trace_http_response_code(trace_span: Span, http_status: StatusCode) { +fn trace_http_response(trace_span: Span, response: &Response) { + use opentelemetry_semantic_conventions::trace::HTTP_RESPONSE_STATUS_CODE; + macro_rules! trace { ($level:expr) => {{ tracing::event!( parent: trace_span, $level, - http_status = http_status.as_u16() + { HTTP_RESPONSE_STATUS_CODE } = response.status().as_u16() ); }}; } - match http_status.as_u16() { + match response.status().as_u16() { s if s < 400 => trace!(tracing::Level::TRACE), s if s < 500 => trace!(tracing::Level::WARN), _ => trace!(tracing::Level::ERROR), @@ -1504,8 +1507,47 @@ fn make_upstream(remotes: &Vec) -> josh::JoshResult, + req: Request, +) -> Result, hyper::http::Error> { + use opentelemetry_semantic_conventions::trace::{HTTP_REQUEST_METHOD, URL_PATH}; + + let span = tracing::Span::current(); + span.record(URL_PATH, req.uri().path()); + span.record(HTTP_REQUEST_METHOD, req.method().to_string()); + + async move { + let response = if let Ok(req_auth) = josh_proxy::auth::strip_auth(req) { + call_service(proxy_service, req_auth) + .await + .unwrap_or_else(|e| { + make_response( + hyper::Body::from(match e { + JoshError(s) => s, + }), + hyper::StatusCode::INTERNAL_SERVER_ERROR, + ) + }) + } else { + make_response( + hyper::Body::from("JoshError(strip_auth)"), + hyper::StatusCode::INTERNAL_SERVER_ERROR, + ) + }; + + trace_http_response(span.clone(), &response); + response + } + .map(Ok::<_, hyper::http::Error>) + .await +} + #[tokio::main] async fn run_proxy() -> josh::JoshResult { + init_trace(); + let addr = format!("[::]:{}", ARGS.port).parse()?; let upstream = make_upstream(&ARGS.remote).map_err(|e| { eprintln!("Upstream parsing error: {}", &e); @@ -1537,42 +1579,9 @@ async fn run_proxy() -> josh::JoshResult { let make_service = make_service_fn(move |_| { let proxy_service = proxy_service.clone(); - - let service = service_fn(move |_req| { + let service = service_fn(move |req| { let proxy_service = proxy_service.clone(); - - let _s = tracing::span!( - tracing::Level::TRACE, - "http_request", - path = _req.uri().path() - ); - let s = _s; - - async move { - let r = if let Ok(req_auth) = josh_proxy::auth::strip_auth(_req) { - match call_service(proxy_service, req_auth) - .instrument(s.clone()) - .await - { - Ok(r) => r, - Err(e) => make_response( - hyper::Body::from(match e { - JoshError(s) => s, - }), - hyper::StatusCode::INTERNAL_SERVER_ERROR, - ), - } - } else { - make_response( - hyper::Body::from("JoshError(strip_auth)"), - hyper::StatusCode::INTERNAL_SERVER_ERROR, - ) - }; - let _e = s.enter(); - trace_http_response_code(s.clone(), r.status()); - r - } - .map(Ok::<_, hyper::http::Error>) + handle_http_request(proxy_service, req) }); future::ok::<_, hyper::http::Error>(service) @@ -1595,6 +1604,7 @@ async fn run_proxy() -> josh::JoshResult { r = server_future => println!("http server exited: {:?}", r), ); } + Ok(0) } @@ -1855,25 +1865,8 @@ async fn shutdown_signal() { println!("shutdown_signal"); } -fn main() { - // josh-proxy creates a symlink to itself as a git update hook. - // When it gets called by git as that hook, the binary name will end - // end in "/update" and this will not be a new server. - // The update hook will then make a http request back to the main - // process to do the actual computation while taking advantage of the - // cached data already loaded into the main processe's memory. - if let [a0, a1, a2, a3, ..] = &std::env::args().collect::>().as_slice() { - if a0.ends_with("/update") { - std::process::exit(update_hook(a1, a2, a3).unwrap_or(1)); - } - } - - if let [a0, ..] = &std::env::args().collect::>().as_slice() { - if a0.ends_with("/pre-receive") { - println!("josh-proxy"); - std::process::exit(pre_receive_hook().unwrap_or(1)); - } - } +fn init_trace() { + use opentelemetry_otlp::WithExportConfig; // Set format for propagating tracing context. This allows to link traces from one invocation // of josh to the next @@ -1889,11 +1882,11 @@ fn main() { _ => tracing_subscriber::EnvFilter::new("josh=trace,josh_proxy=trace"), }; + let service_name = std::env::var("JOSH_SERVICE_NAME").unwrap_or("josh-proxy".to_owned()); + if let Ok(endpoint) = std::env::var("JOSH_JAEGER_ENDPOINT") { let tracer = opentelemetry_jaeger::new_agent_pipeline() - .with_service_name( - std::env::var("JOSH_SERVICE_NAME").unwrap_or("josh-proxy".to_owned()), - ) + .with_service_name(service_name) .with_endpoint(endpoint) .install_simple() .expect("can't install opentelemetry pipeline"); @@ -1903,6 +1896,31 @@ fn main() { .and_then(fmt_layer) .and_then(telemetry_layer) .with_subscriber(tracing_subscriber::Registry::default()); + + tracing::subscriber::set_global_default(subscriber).expect("can't set_global_default"); + } else if let Ok(endpoint) = std::env::var("JOSH_OTLP_ENDPOINT") { + use opentelemetry::KeyValue; + + let resource = + opentelemetry::sdk::Resource::new(vec![KeyValue::new("service.name", service_name)]); + + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint), + ) + .with_trace_config(opentelemetry::sdk::trace::config().with_resource(resource)) + .install_simple() + .expect("can't install opentelemetry pipeline"); + + let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); + let subscriber = filter + .and_then(fmt_layer) + .and_then(telemetry_layer) + .with_subscriber(tracing_subscriber::Registry::default()); + tracing::subscriber::set_global_default(subscriber).expect("can't set_global_default"); } else { let subscriber = filter @@ -1910,6 +1928,29 @@ fn main() { .with_subscriber(tracing_subscriber::Registry::default()); tracing::subscriber::set_global_default(subscriber).expect("can't set_global_default"); }; +} + +fn main() { + // josh-proxy creates a symlink to itself as a git update hook. + // When it gets called by git as that hook, the binary name will end + // end in "/update" and this will not be a new server. + // The update hook will then make a http request back to the main + // process to do the actual computation while taking advantage of the + // cached data already loaded into the main processe's memory. + if let [a0, a1, a2, a3, ..] = &std::env::args().collect::>().as_slice() { + if a0.ends_with("/update") { + std::process::exit(update_hook(a1, a2, a3).unwrap_or(1)); + } + } + + if let [a0, ..] = &std::env::args().collect::>().as_slice() { + if a0.ends_with("/pre-receive") { + println!("josh-proxy"); + std::process::exit(pre_receive_hook().unwrap_or(1)); + } + } - std::process::exit(run_proxy().unwrap_or(1)); + let exit_code = run_proxy().unwrap_or(1); + global::shutdown_tracer_provider(); + std::process::exit(exit_code); }