From cb9dcea81d728ca9eaa5839cba605d20549fc8b8 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Mon, 12 Feb 2024 00:34:40 +0000 Subject: [PATCH] feat: introduce `EdgeRuntime.getRuntimeMetrics` --- Cargo.lock | 1 + crates/base/src/deno_runtime.rs | 11 +- crates/base/src/rt_worker/worker.rs | 35 +++--- crates/base/src/rt_worker/worker_ctx.rs | 116 +++++++++--------- crates/base/src/rt_worker/worker_pool.rs | 8 ++ crates/base/src/server.rs | 35 +++++- .../base/src/utils/integration_test_helper.rs | 3 +- crates/base/tests/linux_pku_sigsegv_test.rs | 3 +- crates/base/tests/main_worker_tests.rs | 3 +- crates/sb_core/Cargo.toml | 3 +- crates/sb_core/js/main_worker.js | 3 +- crates/sb_core/lib.rs | 97 ++++++++++++--- crates/sb_workers/context.rs | 5 +- examples/main/index.ts | 5 + 14 files changed, 223 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a96240151..25f099755 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4238,6 +4238,7 @@ dependencies = [ "deno_websocket", "deno_webstorage", "encoding_rs", + "enum-as-inner 0.6.0", "fs3", "futures", "hyper 0.14.28", diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 3d621f4c8..b4a76423d 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -421,7 +421,7 @@ impl DenoRuntime { // the task from the other threads. // let mut current_thread_id = std::thread::current().id(); - let result = match poll_fn(|cx| { + let poll_result = poll_fn(|cx| { // INVARIANT: Only can steal current task by other threads when LIFO // task scheduler heuristic disabled. Turning off the heuristic is // unstable now, so it's not considered. @@ -487,8 +487,9 @@ impl DenoRuntime { poll_result }) - .await - { + .await; + + let result = match poll_result { Err(err) => Err(anyhow!("event loop error: {}", err)), Ok(_) => match mod_result_rx.await { Err(e) => { @@ -576,6 +577,7 @@ mod test { conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx, + shared_metric_src: None, event_worker_metric_src: None, }) }, @@ -620,6 +622,7 @@ mod test { conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx, + shared_metric_src: None, event_worker_metric_src: None, }) }, @@ -686,6 +689,7 @@ mod test { conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx, + shared_metric_src: None, event_worker_metric_src: None, }) }, @@ -748,6 +752,7 @@ mod test { } else { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx, + shared_metric_src: None, event_worker_metric_src: None, }) } diff --git a/crates/base/src/rt_worker/worker.rs b/crates/base/src/rt_worker/worker.rs index 106fca163..cb56e13e1 100644 --- a/crates/base/src/rt_worker/worker.rs +++ b/crates/base/src/rt_worker/worker.rs @@ -11,7 +11,7 @@ use event_worker::events::{ use futures_util::FutureExt; use log::{debug, error}; use sb_core::conn_sync::ConnSync; -use sb_core::{RuntimeMetricSource, WorkerMetricSource}; +use sb_core::{MetricSource, RuntimeMetricSource, WorkerMetricSource}; use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts}; use std::any::Any; use std::future::{pending, Future}; @@ -88,7 +88,7 @@ impl Worker { UnboundedSender, UnboundedReceiver, ), - booter_signal: Sender>, + booter_signal: Sender>, termination_token: Option, ) { let worker_name = self.worker_name.clone(); @@ -103,11 +103,7 @@ impl Worker { let method_cloner = self.clone(); let timing = opts.timing.take(); let worker_kind = opts.conf.to_worker_kind(); - let maybe_event_worker_metric_src = opts - .conf - .as_main_worker() - .as_ref() - .and_then(|it| it.event_worker_metric_src.clone()); + let maybe_main_worker_opts = opts.conf.as_main_worker().cloned(); let cancel = self.cancel.clone(); let rt = if worker_kind.is_user_worker() { @@ -125,24 +121,29 @@ impl Worker { let result = match DenoRuntime::new(opts).await { Ok(mut new_runtime) => { - let metric = { + let metric_src = { let js_runtime = &mut new_runtime.js_runtime; - let metric = WorkerMetricSource::from_js_runtime(js_runtime); + let metric_src = WorkerMetricSource::from_js_runtime(js_runtime); if worker_kind.is_main_worker() { + let opts = maybe_main_worker_opts.unwrap(); let state = js_runtime.op_state(); let mut state_mut = state.borrow_mut(); - - state_mut.put(RuntimeMetricSource::new( - metric.clone(), - maybe_event_worker_metric_src, - )); + let metric_src = RuntimeMetricSource::new( + metric_src.clone(), + opts.event_worker_metric_src + .and_then(|it| it.into_worker().ok()), + opts.shared_metric_src, + ); + + state_mut.put(metric_src.clone()); + MetricSource::Runtime(metric_src) + } else { + MetricSource::Worker(metric_src) } - - metric }; - let _ = booter_signal.send(Ok(metric)); + let _ = booter_signal.send(Ok(metric_src)); // CPU TIMER let (termination_event_tx, termination_event_rx) = diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index 04e1c080c..f366a2bf6 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -12,7 +12,7 @@ use event_worker::events::{ use hyper::{Body, Request, Response}; use log::{debug, error}; use sb_core::conn_sync::ConnSync; -use sb_core::WorkerMetricSource; +use sb_core::{MetricSource, SharedMetricSource}; use sb_graph::EszipPayloadKind; use sb_workers::context::{ EventWorkerRuntimeOpts, MainWorkerRuntimeOpts, Timing, UserWorkerMsgs, WorkerContextInitOpts, @@ -310,10 +310,10 @@ impl CreateWorkerArgs { pub async fn create_worker>( init_opts: Opt, -) -> Result<(WorkerMetricSource, mpsc::UnboundedSender), Error> { +) -> Result<(MetricSource, mpsc::UnboundedSender), Error> { let (unix_stream_tx, unix_stream_rx) = mpsc::unbounded_channel::(); let (worker_boot_result_tx, worker_boot_result_rx) = - oneshot::channel::>(); + oneshot::channel::>(); let CreateWorkerArgs(init_opts, maybe_supervisor_policy, maybe_termination_token) = init_opts.into(); @@ -464,13 +464,7 @@ pub async fn create_events_worker( no_module_cache: bool, maybe_entrypoint: Option, termination_token: Option, -) -> Result< - ( - WorkerMetricSource, - mpsc::UnboundedSender, - ), - Error, -> { +) -> Result<(MetricSource, mpsc::UnboundedSender), Error> { let (events_tx, events_rx) = mpsc::unbounded_channel::(); let mut service_path = events_worker_path.clone(); @@ -509,71 +503,79 @@ pub async fn create_user_worker_pool( policy: WorkerPoolPolicy, worker_event_sender: Option>, termination_token: Option, -) -> Result, Error> { +) -> Result<(SharedMetricSource, mpsc::UnboundedSender), Error> { + let metric_src = SharedMetricSource::default(); let (user_worker_msgs_tx, mut user_worker_msgs_rx) = mpsc::unbounded_channel::(); let user_worker_msgs_tx_clone = user_worker_msgs_tx.clone(); - let _handle: tokio::task::JoinHandle> = tokio::spawn(async move { - let token = termination_token.as_ref(); - let mut termination_requested = false; - let mut worker_pool = - WorkerPool::new(policy, worker_event_sender, user_worker_msgs_tx_clone); - - // Note: Keep this loop non-blocking. Spawn a task to run blocking calls. - // Handle errors within tasks and log them - do not bubble up errors. - loop { - tokio::select! { - _ = async { - if let Some(token) = token { - token.inbound.cancelled().await; - } else { - pending::<()>().await; - } - }, if !termination_requested => { - termination_requested = true; - - if worker_pool.user_workers.is_empty() { + let _handle: tokio::task::JoinHandle> = tokio::spawn({ + let metric_src_inner = metric_src.clone(); + async move { + let token = termination_token.as_ref(); + let mut termination_requested = false; + let mut worker_pool = WorkerPool::new( + policy, + metric_src_inner, + worker_event_sender, + user_worker_msgs_tx_clone, + ); + + // Note: Keep this loop non-blocking. Spawn a task to run blocking calls. + // Handle errors within tasks and log them - do not bubble up errors. + loop { + tokio::select! { + _ = async { if let Some(token) = token { - token.outbound.cancel(); + token.inbound.cancelled().await; + } else { + pending::<()>().await; } + }, if !termination_requested => { + termination_requested = true; - break; - } - } + if worker_pool.user_workers.is_empty() { + if let Some(token) = token { + token.outbound.cancel(); + } - msg = user_worker_msgs_rx.recv() => { - match msg { - None => break, - Some(UserWorkerMsgs::Create(worker_options, tx)) => { - worker_pool.create_user_worker(worker_options, tx, termination_token.as_ref().map(|it| it.child_token())); - } - Some(UserWorkerMsgs::Created(key, profile)) => { - worker_pool.add_user_worker(key, profile); - } - Some(UserWorkerMsgs::SendRequest(key, req, res_tx, conn_watch)) => { - worker_pool.send_request(&key, req, res_tx, conn_watch); - } - Some(UserWorkerMsgs::Idle(key)) => { - worker_pool.idle(&key); + break; } - Some(UserWorkerMsgs::Shutdown(key)) => { - worker_pool.shutdown(&key); + } - if let Some(token) = token { - if token.inbound.is_cancelled() && worker_pool.user_workers.is_empty() { - token.outbound.cancel(); + msg = user_worker_msgs_rx.recv() => { + match msg { + None => break, + Some(UserWorkerMsgs::Create(worker_options, tx)) => { + worker_pool.create_user_worker(worker_options, tx, termination_token.as_ref().map(|it| it.child_token())); + } + Some(UserWorkerMsgs::Created(key, profile)) => { + worker_pool.add_user_worker(key, profile); + } + Some(UserWorkerMsgs::SendRequest(key, req, res_tx, conn_watch)) => { + worker_pool.send_request(&key, req, res_tx, conn_watch); + } + Some(UserWorkerMsgs::Idle(key)) => { + worker_pool.idle(&key); + } + Some(UserWorkerMsgs::Shutdown(key)) => { + worker_pool.shutdown(&key); + + if let Some(token) = token { + if token.inbound.is_cancelled() && worker_pool.user_workers.is_empty() { + token.outbound.cancel(); + } } } } } } } - } - Ok(()) + Ok(()) + } }); - Ok(user_worker_msgs_tx) + Ok((metric_src, user_worker_msgs_tx)) } diff --git a/crates/base/src/rt_worker/worker_pool.rs b/crates/base/src/rt_worker/worker_pool.rs index cd9e09d01..e2ee0f408 100644 --- a/crates/base/src/rt_worker/worker_pool.rs +++ b/crates/base/src/rt_worker/worker_pool.rs @@ -7,6 +7,7 @@ use hyper::Body; use log::error; use sb_core::conn_sync::ConnSync; use sb_core::util::sync::AtomicFlag; +use sb_core::SharedMetricSource; use sb_workers::context::{ CreateUserWorkerResult, SendRequestResult, Timing, TimingStatus, UserWorkerMsgs, UserWorkerProfile, WorkerContextInitOpts, WorkerRuntimeOpts, @@ -203,6 +204,7 @@ impl ActiveWorkerRegistry { // send_request is called with UUID pub struct WorkerPool { pub policy: WorkerPoolPolicy, + pub metric_src: SharedMetricSource, pub user_workers: HashMap, pub active_workers: HashMap, pub worker_pool_msgs_tx: mpsc::UnboundedSender, @@ -214,11 +216,13 @@ pub struct WorkerPool { impl WorkerPool { pub(crate) fn new( policy: WorkerPoolPolicy, + metric_src: SharedMetricSource, worker_event_sender: Option>, worker_pool_msgs_tx: mpsc::UnboundedSender, ) -> Self { Self { policy, + metric_src, worker_event_sender, user_workers: HashMap::new(), active_workers: HashMap::new(), @@ -447,6 +451,7 @@ impl WorkerPool { .insert(WorkerId(key, self.policy.supervisor_policy.is_per_worker())); self.user_workers.insert(key, profile); + self.metric_src.incl_active_user_workers(); } pub fn send_request( @@ -552,6 +557,8 @@ impl WorkerPool { }; let _ = notify_tx.send(None); + + self.metric_src.decl_active_user_workers(); } fn retire(&mut self, key: &Uuid) { @@ -570,6 +577,7 @@ impl WorkerPool { if registry.workers.contains(key) { registry.workers.remove(key); + self.metric_src.incl_retired_user_worker(); } } } diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 707f5f850..b2a8a3c98 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -8,6 +8,7 @@ use futures_util::Stream; use hyper::{server::conn::Http, service::Service, Body, Request, Response}; use log::{debug, error, info}; use sb_core::conn_sync::ConnSync; +use sb_core::SharedMetricSource; use sb_workers::context::{MainWorkerRuntimeOpts, WorkerRequestMsg}; use std::future::Future; use std::net::IpAddr; @@ -57,15 +58,20 @@ impl Stream for NotifyOnEos { } struct WorkerService { + metric_src: SharedMetricSource, worker_req_tx: mpsc::UnboundedSender, cancel: CancellationToken, } impl WorkerService { - fn new(worker_req_tx: mpsc::UnboundedSender) -> (Self, CancellationToken) { + fn new( + metric_src: SharedMetricSource, + worker_req_tx: mpsc::UnboundedSender, + ) -> (Self, CancellationToken) { let cancel = CancellationToken::new(); ( Self { + metric_src, worker_req_tx, cancel: cancel.clone(), }, @@ -86,6 +92,7 @@ impl Service> for WorkerService { fn call(&mut self, req: Request) -> Self::Future { // create a response in a future. let cancel = self.cancel.child_token(); + let metric_src = self.metric_src.clone(); let worker_req_tx = self.worker_req_tx.clone(); let fut = async move { let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); @@ -99,12 +106,16 @@ impl Service> for WorkerService { }; worker_req_tx.send(msg)?; + metric_src.incl_received_requests(); tokio::spawn({ + let metric_src_inner = metric_src.clone(); let cancel = cancel.clone(); + async move { tokio::select! { _ = cancel.cancelled() => { + metric_src_inner.incl_handled_requests(); if let Err(ex) = ob_conn_watch_tx.send(ConnSync::Recv) { error!("can't update connection watcher: {}", ex.to_string()); } @@ -116,7 +127,15 @@ impl Service> for WorkerService { } }); - let res = match res_rx.await? { + let res = match res_rx.await { + Ok(res) => res, + Err(err) => { + metric_src.incl_handled_requests(); + return Err(err.into()); + } + }; + + let res = match res { Ok(res) => res, Err(e) => { error!( @@ -164,6 +183,7 @@ pub struct Server { main_worker_req_tx: mpsc::UnboundedSender, callback_tx: Option>, termination_token: TerminationToken, + metric_src: SharedMetricSource, } impl Server { @@ -190,7 +210,7 @@ impl Server { let events_path = Path::new(&events_service_path); let events_path_buf = events_path.to_path_buf(); - let (metric, sender) = create_events_worker( + let (event_worker_metric, sender) = create_events_worker( events_path_buf, import_map_path.clone(), no_module_cache, @@ -200,13 +220,13 @@ impl Server { .await?; worker_events_tx = Some(sender); - Some(metric) + Some(event_worker_metric) } else { None }; // Create a user worker pool - let worker_pool_tx = create_user_worker_pool( + let (shared_metric_src, worker_pool_tx) = create_user_worker_pool( maybe_user_worker_policy.unwrap_or_default(), worker_events_tx, None, @@ -221,6 +241,7 @@ impl Server { no_module_cache, MainWorkerRuntimeOpts { worker_pool_tx, + shared_metric_src: Some(shared_metric_src.clone()), event_worker_metric_src, }, maybe_main_entrypoint, @@ -235,6 +256,7 @@ impl Server { main_worker_req_tx, callback_tx, termination_token, + metric_src: shared_metric_src, }) } @@ -259,6 +281,7 @@ impl Server { loop { let main_worker_req_tx = self.main_worker_req_tx.clone(); + let metric_src = self.metric_src.clone(); tokio::select! { msg = listener.accept() => { @@ -267,7 +290,7 @@ impl Server { tokio::task::spawn({ let event_tx = event_tx.clone(); async move { - let (service, cancel) = WorkerService::new(main_worker_req_tx); + let (service, cancel) = WorkerService::new(metric_src, main_worker_req_tx); let _guard = cancel.drop_guard(); let conn_fut = Http::new() diff --git a/crates/base/src/utils/integration_test_helper.rs b/crates/base/src/utils/integration_test_helper.rs index d5db7e268..cf7cdf0ac 100644 --- a/crates/base/src/utils/integration_test_helper.rs +++ b/crates/base/src/utils/integration_test_helper.rs @@ -192,7 +192,7 @@ impl TestBedBuilder { } pub async fn build(self) -> TestBed { - let (worker_pool_tx, pool_termination_token) = { + let ((_, worker_pool_tx), pool_termination_token) = { let token = TerminationToken::new(); ( create_user_worker_pool( @@ -219,6 +219,7 @@ impl TestBedBuilder { maybe_module_code: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx, + shared_metric_src: None, event_worker_metric_src: None, }), }; diff --git a/crates/base/tests/linux_pku_sigsegv_test.rs b/crates/base/tests/linux_pku_sigsegv_test.rs index 3923e9f73..df1abd626 100644 --- a/crates/base/tests/linux_pku_sigsegv_test.rs +++ b/crates/base/tests/linux_pku_sigsegv_test.rs @@ -21,7 +21,7 @@ async fn test_not_trigger_pku_sigsegv_due_to_jit_compilation_non_cli() { let main_termination_token = TerminationToken::new(); // create a user worker pool - let worker_pool_tx = create_user_worker_pool( + let (_, worker_pool_tx) = create_user_worker_pool( integration_test_helper::test_user_worker_pool_policy(), None, Some(pool_termination_token.clone()), @@ -41,6 +41,7 @@ async fn test_not_trigger_pku_sigsegv_due_to_jit_compilation_non_cli() { maybe_module_code: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx, + shared_metric_src: None, event_worker_metric_src: None, }), }; diff --git a/crates/base/tests/main_worker_tests.rs b/crates/base/tests/main_worker_tests.rs index 1b52eda44..a6035b3d8 100644 --- a/crates/base/tests/main_worker_tests.rs +++ b/crates/base/tests/main_worker_tests.rs @@ -111,7 +111,7 @@ async fn test_main_worker_boot_error() { let main_termination_token = TerminationToken::new(); // create a user worker pool - let worker_pool_tx = create_user_worker_pool( + let (_, worker_pool_tx) = create_user_worker_pool( test_user_worker_pool_policy(), None, Some(pool_termination_token.clone()), @@ -131,6 +131,7 @@ async fn test_main_worker_boot_error() { maybe_module_code: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx, + shared_metric_src: None, event_worker_metric_src: None, }), }; diff --git a/crates/sb_core/Cargo.toml b/crates/sb_core/Cargo.toml index cb3d8d922..c0c94c9d9 100644 --- a/crates/sb_core/Cargo.toml +++ b/crates/sb_core/Cargo.toml @@ -47,4 +47,5 @@ encoding_rs = { version = "=0.8.33" } base64.workspace = true futures.workspace = true percent-encoding.workspace = true -scopeguard.workspace = true \ No newline at end of file +scopeguard.workspace = true +enum-as-inner.workspace = true \ No newline at end of file diff --git a/crates/sb_core/js/main_worker.js b/crates/sb_core/js/main_worker.js index c1c231a52..a92da55fd 100644 --- a/crates/sb_core/js/main_worker.js +++ b/crates/sb_core/js/main_worker.js @@ -1,5 +1,4 @@ import { SUPABASE_USER_WORKERS } from "ext:sb_user_workers/user_workers.js"; -import { getterOnly } from "ext:sb_core_main_js/js/fieldUtils.js"; import { applyWatcherRid } from "ext:sb_core_main_js/js/http.js"; import { core } from "ext:core/mod.js"; @@ -9,7 +8,7 @@ Object.defineProperty(globalThis, 'EdgeRuntime', { get() { return { userWorkers: SUPABASE_USER_WORKERS, - runtimeMetrics: getterOnly(() => ops.op_runtime_metrics()), + getRuntimeMetrics: () => /* async */ ops.op_runtime_metrics(), applyConnectionWatcher: (src, dest) => { applyWatcherRid(src, dest); } diff --git a/crates/sb_core/lib.rs b/crates/sb_core/lib.rs index 7182ec8d7..4bed74689 100644 --- a/crates/sb_core/lib.rs +++ b/crates/sb_core/lib.rs @@ -1,11 +1,13 @@ use std::cell::RefCell; use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use deno_core::error::AnyError; use deno_core::v8::IsolateHandle; use deno_core::OpState; use deno_core::{op2, JsRuntime}; +use enum_as_inner::EnumAsInner; use futures::task::AtomicWaker; use futures::FutureExt; use log::error; @@ -27,6 +29,49 @@ pub mod runtime; pub mod transpiler; pub mod util; +#[derive(Debug, Default, Clone)] +pub struct SharedMetricSource { + active_user_workers: Arc, + retired_user_workers: Arc, + received_requests: Arc, + handled_requests: Arc, +} + +impl SharedMetricSource { + pub fn incl_active_user_workers(&self) { + self.active_user_workers.fetch_add(1, Ordering::Relaxed); + } + + pub fn decl_active_user_workers(&self) { + self.active_user_workers.fetch_sub(1, Ordering::Relaxed); + } + + pub fn incl_retired_user_worker(&self) { + self.retired_user_workers.fetch_add(1, Ordering::Relaxed); + } + + pub fn incl_received_requests(&self) { + self.received_requests.fetch_add(1, Ordering::Relaxed); + } + + pub fn incl_handled_requests(&self) { + self.handled_requests.fetch_add(1, Ordering::Relaxed); + } + + pub fn reset(&self) { + self.active_user_workers.store(0, Ordering::Relaxed); + self.retired_user_workers.store(0, Ordering::Relaxed); + self.received_requests.store(0, Ordering::Relaxed); + self.handled_requests.store(0, Ordering::Relaxed); + } +} + +#[derive(Debug, Clone, EnumAsInner)] +pub enum MetricSource { + Worker(WorkerMetricSource), + Runtime(RuntimeMetricSource), +} + #[derive(Debug, Clone)] pub struct WorkerMetricSource { handle: IsolateHandle, @@ -55,15 +100,21 @@ impl WorkerMetricSource { #[derive(Debug, Clone)] pub struct RuntimeMetricSource { - main: WorkerMetricSource, - event: Option, + pub main: WorkerMetricSource, + pub event: Option, + pub shared: SharedMetricSource, } impl RuntimeMetricSource { - pub fn new(main: WorkerMetricSource, maybe_event: Option) -> Self { + pub fn new( + main: WorkerMetricSource, + maybe_event: Option, + maybe_shared: Option, + ) -> Self { Self { main, event: maybe_event, + shared: maybe_shared.unwrap_or_default(), } } @@ -158,16 +209,33 @@ struct RuntimeHeapStatistics { #[derive(Debug, Serialize, Default)] #[serde(rename_all = "camelCase")] -struct RuntimeMetrics { - #[serde(flatten)] - heap_stats: RuntimeHeapStatistics, - +struct RuntimeSharedStatistics { active_user_workers_count: usize, retired_user_workers_count: usize, received_requests_count: usize, handled_requests_count: usize, } +impl RuntimeSharedStatistics { + fn from_shared_metric_src(src: &SharedMetricSource) -> Self { + Self { + active_user_workers_count: src.active_user_workers.load(Ordering::Relaxed), + retired_user_workers_count: src.retired_user_workers.load(Ordering::Relaxed), + received_requests_count: src.received_requests.load(Ordering::Relaxed), + handled_requests_count: src.handled_requests.load(Ordering::Relaxed), + } + } +} + +#[derive(Debug, Serialize, Default)] +#[serde(rename_all = "camelCase")] +struct RuntimeMetrics { + #[serde(flatten)] + heap_stats: RuntimeHeapStatistics, + #[serde(flatten)] + shared_stats: RuntimeSharedStatistics, +} + #[op2(fast)] fn op_is_terminal(state: &mut OpState, rid: u32) -> Result { let handle = state.resource_table.get_handle(rid)?; @@ -187,14 +255,15 @@ fn op_console_size(_state: &mut OpState, #[buffer] _result: &mut [u32]) -> Resul #[op2(async)] #[serde] async fn op_runtime_metrics(state: Rc>) -> Result { - let state = state.borrow(); let mut runtime_metrics = RuntimeMetrics::default(); - - runtime_metrics.heap_stats = state - .borrow::() - .clone() - .get_heap_statistics() - .await; + let mut runtime_metric_src = { + let state = state.borrow(); + state.borrow::().clone() + }; + + runtime_metrics.heap_stats = runtime_metric_src.get_heap_statistics().await; + runtime_metrics.shared_stats = + RuntimeSharedStatistics::from_shared_metric_src(&runtime_metric_src.shared); Ok(runtime_metrics) } diff --git a/crates/sb_workers/context.rs b/crates/sb_workers/context.rs index 22604e0aa..371200f88 100644 --- a/crates/sb_workers/context.rs +++ b/crates/sb_workers/context.rs @@ -5,7 +5,7 @@ use event_worker::events::WorkerEventWithMetadata; use hyper::{Body, Request, Response}; use sb_core::conn_sync::ConnSync; use sb_core::util::sync::AtomicFlag; -use sb_core::WorkerMetricSource; +use sb_core::{MetricSource, SharedMetricSource}; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::{collections::HashMap, sync::Arc}; @@ -76,7 +76,8 @@ pub struct UserWorkerProfile { #[derive(Debug, Clone)] pub struct MainWorkerRuntimeOpts { pub worker_pool_tx: mpsc::UnboundedSender, - pub event_worker_metric_src: Option, + pub shared_metric_src: Option, + pub event_worker_metric_src: Option, } #[derive(Debug, Clone)] diff --git a/examples/main/index.ts b/examples/main/index.ts index a12f2d12b..6d04130f8 100644 --- a/examples/main/index.ts +++ b/examples/main/index.ts @@ -14,6 +14,11 @@ serve(async (req: Request) => { ); } + if (pathname === '/_internal/metric') { + const metric = await EdgeRuntime.getRuntimeMetrics(); + return Response.json(metric); + } + const path_parts = pathname.split('/'); const service_name = path_parts[1];