diff --git a/Cargo.lock b/Cargo.lock index a96240151..25f099755 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4238,6 +4238,7 @@ dependencies = [ "deno_websocket", "deno_webstorage", "encoding_rs", + "enum-as-inner 0.6.0", "fs3", "futures", "hyper 0.14.28", diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 142b0bc7a..b4a76423d 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -421,7 +421,7 @@ impl DenoRuntime { // the task from the other threads. // let mut current_thread_id = std::thread::current().id(); - let result = match poll_fn(|cx| { + let poll_result = poll_fn(|cx| { // INVARIANT: Only can steal current task by other threads when LIFO // task scheduler heuristic disabled. Turning off the heuristic is // unstable now, so it's not considered. @@ -487,8 +487,9 @@ impl DenoRuntime { poll_result }) - .await - { + .await; + + let result = match poll_result { Err(err) => Err(anyhow!("event loop error: {}", err)), Ok(_) => match mod_result_rx.await { Err(e) => { @@ -573,7 +574,13 @@ mod test { maybe_module_code: Some(FastString::from(String::from( "Deno.serve((req) => new Response('Hello World'));", ))), - conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) }, + conf: { + WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { + worker_pool_tx, + shared_metric_src: None, + event_worker_metric_src: None, + }) + }, }) .await .expect("It should not panic"); @@ -612,7 +619,13 @@ mod test { maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)), maybe_entrypoint: None, maybe_module_code: None, - conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) }, + conf: { + WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { + worker_pool_tx, + shared_metric_src: None, + event_worker_metric_src: None, + }) + }, }) .await; @@ -673,7 +686,13 @@ mod test { maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)), maybe_entrypoint: None, maybe_module_code: None, - conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) }, + conf: { + WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { + worker_pool_tx, + shared_metric_src: None, + event_worker_metric_src: None, + }) + }, }) .await; @@ -731,7 +750,11 @@ mod test { if let Some(uc) = user_conf { uc } else { - WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) + WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { + worker_pool_tx, + shared_metric_src: None, + event_worker_metric_src: None, + }) } }, }) diff --git a/crates/base/src/rt_worker/worker.rs b/crates/base/src/rt_worker/worker.rs index 94fa2187a..cb56e13e1 100644 --- a/crates/base/src/rt_worker/worker.rs +++ b/crates/base/src/rt_worker/worker.rs @@ -11,6 +11,7 @@ use event_worker::events::{ use futures_util::FutureExt; use log::{debug, error}; use sb_core::conn_sync::ConnSync; +use sb_core::{MetricSource, RuntimeMetricSource, WorkerMetricSource}; use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts}; use std::any::Any; use std::future::{pending, Future}; @@ -87,7 +88,7 @@ impl Worker { UnboundedSender, UnboundedReceiver, ), - booter_signal: Sender>, + booter_signal: Sender>, termination_token: Option, ) { let worker_name = self.worker_name.clone(); @@ -101,10 +102,11 @@ impl Worker { let method_cloner = self.clone(); let timing = opts.timing.take(); - let is_user_worker = opts.conf.is_user_worker(); + let worker_kind = opts.conf.to_worker_kind(); + let maybe_main_worker_opts = opts.conf.as_main_worker().cloned(); let cancel = self.cancel.clone(); - let rt = if is_user_worker { + let rt = if worker_kind.is_user_worker() { &rt::USER_WORKER_RT } else { &rt::PRIMARY_WORKER_RT @@ -112,13 +114,36 @@ impl Worker { let _worker_handle = rt.spawn_pinned(move || { tokio::task::spawn_local(async move { - let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = is_user_worker + let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = worker_kind + .is_user_worker() .then(unbounded_channel::) .unzip(); let result = match DenoRuntime::new(opts).await { Ok(mut new_runtime) => { - let _ = booter_signal.send(Ok(())); + let metric_src = { + let js_runtime = &mut new_runtime.js_runtime; + let metric_src = WorkerMetricSource::from_js_runtime(js_runtime); + + if worker_kind.is_main_worker() { + let opts = maybe_main_worker_opts.unwrap(); + let state = js_runtime.op_state(); + let mut state_mut = state.borrow_mut(); + let metric_src = RuntimeMetricSource::new( + metric_src.clone(), + opts.event_worker_metric_src + .and_then(|it| it.into_worker().ok()), + opts.shared_metric_src, + ); + + state_mut.put(metric_src.clone()); + MetricSource::Runtime(metric_src) + } else { + MetricSource::Worker(metric_src) + } + }; + + let _ = booter_signal.send(Ok(metric_src)); // CPU TIMER let (termination_event_tx, termination_event_rx) = @@ -127,7 +152,7 @@ impl Worker { let _cpu_timer; // TODO: Allow customization of supervisor - let termination_fut = if is_user_worker { + let termination_fut = if worker_kind.is_user_worker() { // cputimer is returned from supervisor and assigned here to keep it in scope. let Ok(maybe_timer) = create_supervisor( worker_key.unwrap_or(Uuid::nil()), @@ -209,7 +234,7 @@ impl Worker { let result = data.await; if let Some(token) = termination_token.as_ref() { - if !is_user_worker { + if !worker_kind.is_user_worker() { let _ = termination_fut.await; } diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index ed5ab9eae..f366a2bf6 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -12,6 +12,7 @@ use event_worker::events::{ use hyper::{Body, Request, Response}; use log::{debug, error}; use sb_core::conn_sync::ConnSync; +use sb_core::{MetricSource, SharedMetricSource}; use sb_graph::EszipPayloadKind; use sb_workers::context::{ EventWorkerRuntimeOpts, MainWorkerRuntimeOpts, Timing, UserWorkerMsgs, WorkerContextInitOpts, @@ -309,9 +310,10 @@ impl CreateWorkerArgs { pub async fn create_worker>( init_opts: Opt, -) -> Result, Error> { - let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel::>(); +) -> Result<(MetricSource, mpsc::UnboundedSender), Error> { let (unix_stream_tx, unix_stream_rx) = mpsc::unbounded_channel::(); + let (worker_boot_result_tx, worker_boot_result_rx) = + oneshot::channel::>(); let CreateWorkerArgs(init_opts, maybe_supervisor_policy, maybe_termination_token) = init_opts.into(); @@ -370,7 +372,7 @@ pub async fn create_worker>( bail!(err) } - Ok(_) => { + Ok(metric) => { let elapsed = worker_struct_ref .worker_boot_start_time .elapsed() @@ -384,7 +386,7 @@ pub async fn create_worker>( worker_struct_ref.event_metadata.clone(), ); - Ok(worker_req_tx) + Ok((metric, worker_req_tx)) } } } else { @@ -422,7 +424,7 @@ pub async fn create_main_worker( main_worker_path: PathBuf, import_map_path: Option, no_module_cache: bool, - user_worker_msgs_tx: mpsc::UnboundedSender, + runtime_opts: MainWorkerRuntimeOpts, maybe_entrypoint: Option, termination_token: Option, ) -> Result, Error> { @@ -435,7 +437,7 @@ pub async fn create_main_worker( } } - let main_worker_req_tx = create_worker(( + let (_, sender) = create_worker(( WorkerContextInitOpts { service_path, import_map_path, @@ -445,9 +447,7 @@ pub async fn create_main_worker( maybe_eszip, maybe_entrypoint, maybe_module_code: None, - conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { - worker_pool_tx: user_worker_msgs_tx, - }), + conf: WorkerRuntimeOpts::MainWorker(runtime_opts), env_vars: std::env::vars().collect(), }, termination_token, @@ -455,7 +455,7 @@ pub async fn create_main_worker( .await .map_err(|err| anyhow!("main worker boot error: {}", err))?; - Ok(main_worker_req_tx) + Ok(sender) } pub async fn create_events_worker( @@ -464,7 +464,7 @@ pub async fn create_events_worker( no_module_cache: bool, maybe_entrypoint: Option, termination_token: Option, -) -> Result, Error> { +) -> Result<(MetricSource, mpsc::UnboundedSender), Error> { let (events_tx, events_rx) = mpsc::unbounded_channel::(); let mut service_path = events_worker_path.clone(); @@ -478,7 +478,7 @@ pub async fn create_events_worker( } } - let _ = create_worker(( + let (metric, _) = create_worker(( WorkerContextInitOpts { service_path, no_module_cache, @@ -496,78 +496,86 @@ pub async fn create_events_worker( .await .map_err(|err| anyhow!("events worker boot error: {}", err))?; - Ok(events_tx) + Ok((metric, events_tx)) } pub async fn create_user_worker_pool( policy: WorkerPoolPolicy, worker_event_sender: Option>, termination_token: Option, -) -> Result, Error> { +) -> Result<(SharedMetricSource, mpsc::UnboundedSender), Error> { + let metric_src = SharedMetricSource::default(); let (user_worker_msgs_tx, mut user_worker_msgs_rx) = mpsc::unbounded_channel::(); let user_worker_msgs_tx_clone = user_worker_msgs_tx.clone(); - let _handle: tokio::task::JoinHandle> = tokio::spawn(async move { - let token = termination_token.as_ref(); - let mut termination_requested = false; - let mut worker_pool = - WorkerPool::new(policy, worker_event_sender, user_worker_msgs_tx_clone); - - // Note: Keep this loop non-blocking. Spawn a task to run blocking calls. - // Handle errors within tasks and log them - do not bubble up errors. - loop { - tokio::select! { - _ = async { - if let Some(token) = token { - token.inbound.cancelled().await; - } else { - pending::<()>().await; - } - }, if !termination_requested => { - termination_requested = true; - - if worker_pool.user_workers.is_empty() { + let _handle: tokio::task::JoinHandle> = tokio::spawn({ + let metric_src_inner = metric_src.clone(); + async move { + let token = termination_token.as_ref(); + let mut termination_requested = false; + let mut worker_pool = WorkerPool::new( + policy, + metric_src_inner, + worker_event_sender, + user_worker_msgs_tx_clone, + ); + + // Note: Keep this loop non-blocking. Spawn a task to run blocking calls. + // Handle errors within tasks and log them - do not bubble up errors. + loop { + tokio::select! { + _ = async { if let Some(token) = token { - token.outbound.cancel(); + token.inbound.cancelled().await; + } else { + pending::<()>().await; } + }, if !termination_requested => { + termination_requested = true; - break; - } - } + if worker_pool.user_workers.is_empty() { + if let Some(token) = token { + token.outbound.cancel(); + } - msg = user_worker_msgs_rx.recv() => { - match msg { - None => break, - Some(UserWorkerMsgs::Create(worker_options, tx)) => { - worker_pool.create_user_worker(worker_options, tx, termination_token.as_ref().map(|it| it.child_token())); - } - Some(UserWorkerMsgs::Created(key, profile)) => { - worker_pool.add_user_worker(key, profile); - } - Some(UserWorkerMsgs::SendRequest(key, req, res_tx, conn_watch)) => { - worker_pool.send_request(&key, req, res_tx, conn_watch); - } - Some(UserWorkerMsgs::Idle(key)) => { - worker_pool.idle(&key); + break; } - Some(UserWorkerMsgs::Shutdown(key)) => { - worker_pool.shutdown(&key); + } - if let Some(token) = token { - if token.inbound.is_cancelled() && worker_pool.user_workers.is_empty() { - token.outbound.cancel(); + msg = user_worker_msgs_rx.recv() => { + match msg { + None => break, + Some(UserWorkerMsgs::Create(worker_options, tx)) => { + worker_pool.create_user_worker(worker_options, tx, termination_token.as_ref().map(|it| it.child_token())); + } + Some(UserWorkerMsgs::Created(key, profile)) => { + worker_pool.add_user_worker(key, profile); + } + Some(UserWorkerMsgs::SendRequest(key, req, res_tx, conn_watch)) => { + worker_pool.send_request(&key, req, res_tx, conn_watch); + } + Some(UserWorkerMsgs::Idle(key)) => { + worker_pool.idle(&key); + } + Some(UserWorkerMsgs::Shutdown(key)) => { + worker_pool.shutdown(&key); + + if let Some(token) = token { + if token.inbound.is_cancelled() && worker_pool.user_workers.is_empty() { + token.outbound.cancel(); + } } } } } } } - } - Ok(()) + Ok(()) + } }); - Ok(user_worker_msgs_tx) + Ok((metric_src, user_worker_msgs_tx)) } diff --git a/crates/base/src/rt_worker/worker_pool.rs b/crates/base/src/rt_worker/worker_pool.rs index f31b412de..e2ee0f408 100644 --- a/crates/base/src/rt_worker/worker_pool.rs +++ b/crates/base/src/rt_worker/worker_pool.rs @@ -7,6 +7,7 @@ use hyper::Body; use log::error; use sb_core::conn_sync::ConnSync; use sb_core::util::sync::AtomicFlag; +use sb_core::SharedMetricSource; use sb_workers::context::{ CreateUserWorkerResult, SendRequestResult, Timing, TimingStatus, UserWorkerMsgs, UserWorkerProfile, WorkerContextInitOpts, WorkerRuntimeOpts, @@ -203,6 +204,7 @@ impl ActiveWorkerRegistry { // send_request is called with UUID pub struct WorkerPool { pub policy: WorkerPoolPolicy, + pub metric_src: SharedMetricSource, pub user_workers: HashMap, pub active_workers: HashMap, pub worker_pool_msgs_tx: mpsc::UnboundedSender, @@ -214,11 +216,13 @@ pub struct WorkerPool { impl WorkerPool { pub(crate) fn new( policy: WorkerPoolPolicy, + metric_src: SharedMetricSource, worker_event_sender: Option>, worker_pool_msgs_tx: mpsc::UnboundedSender, ) -> Self { Self { policy, + metric_src, worker_event_sender, user_workers: HashMap::new(), active_workers: HashMap::new(), @@ -404,7 +408,7 @@ impl WorkerPool { match create_worker((worker_options, supervisor_policy, termination_token.clone())) .await { - Ok(worker_request_msg_tx) => { + Ok((_, worker_request_msg_tx)) => { let profile = UserWorkerProfile { worker_request_msg_tx, timing_tx_pair: (req_start_timing_tx, req_end_timing_tx), @@ -447,6 +451,7 @@ impl WorkerPool { .insert(WorkerId(key, self.policy.supervisor_policy.is_per_worker())); self.user_workers.insert(key, profile); + self.metric_src.incl_active_user_workers(); } pub fn send_request( @@ -552,6 +557,8 @@ impl WorkerPool { }; let _ = notify_tx.send(None); + + self.metric_src.decl_active_user_workers(); } fn retire(&mut self, key: &Uuid) { @@ -570,6 +577,7 @@ impl WorkerPool { if registry.workers.contains(key) { registry.workers.remove(key); + self.metric_src.incl_retired_user_worker(); } } } diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 2003b71f2..b2a8a3c98 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -8,7 +8,8 @@ use futures_util::Stream; use hyper::{server::conn::Http, service::Service, Body, Request, Response}; use log::{debug, error, info}; use sb_core::conn_sync::ConnSync; -use sb_workers::context::WorkerRequestMsg; +use sb_core::SharedMetricSource; +use sb_workers::context::{MainWorkerRuntimeOpts, WorkerRequestMsg}; use std::future::Future; use std::net::IpAddr; use std::net::Ipv4Addr; @@ -57,15 +58,20 @@ impl Stream for NotifyOnEos { } struct WorkerService { + metric_src: SharedMetricSource, worker_req_tx: mpsc::UnboundedSender, cancel: CancellationToken, } impl WorkerService { - fn new(worker_req_tx: mpsc::UnboundedSender) -> (Self, CancellationToken) { + fn new( + metric_src: SharedMetricSource, + worker_req_tx: mpsc::UnboundedSender, + ) -> (Self, CancellationToken) { let cancel = CancellationToken::new(); ( Self { + metric_src, worker_req_tx, cancel: cancel.clone(), }, @@ -86,6 +92,7 @@ impl Service> for WorkerService { fn call(&mut self, req: Request) -> Self::Future { // create a response in a future. let cancel = self.cancel.child_token(); + let metric_src = self.metric_src.clone(); let worker_req_tx = self.worker_req_tx.clone(); let fut = async move { let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); @@ -99,12 +106,16 @@ impl Service> for WorkerService { }; worker_req_tx.send(msg)?; + metric_src.incl_received_requests(); tokio::spawn({ + let metric_src_inner = metric_src.clone(); let cancel = cancel.clone(); + async move { tokio::select! { _ = cancel.cancelled() => { + metric_src_inner.incl_handled_requests(); if let Err(ex) = ob_conn_watch_tx.send(ConnSync::Recv) { error!("can't update connection watcher: {}", ex.to_string()); } @@ -116,7 +127,15 @@ impl Service> for WorkerService { } }); - let res = match res_rx.await? { + let res = match res_rx.await { + Ok(res) => res, + Err(err) => { + metric_src.incl_handled_requests(); + return Err(err.into()); + } + }; + + let res = match res { Ok(res) => res, Err(e) => { error!( @@ -164,6 +183,7 @@ pub struct Server { main_worker_req_tx: mpsc::UnboundedSender, callback_tx: Option>, termination_token: TerminationToken, + metric_src: SharedMetricSource, } impl Server { @@ -180,17 +200,17 @@ impl Server { entrypoints: WorkerEntrypoints, termination_token: Option, ) -> Result { - let mut worker_events_sender: Option> = None; + let mut worker_events_tx: Option> = None; let maybe_events_entrypoint = entrypoints.events; let maybe_main_entrypoint = entrypoints.main; let termination_token = termination_token.unwrap_or_default(); // Create Event Worker - if let Some(events_service_path) = maybe_events_service_path { + let event_worker_metric_src = if let Some(events_service_path) = maybe_events_service_path { let events_path = Path::new(&events_service_path); let events_path_buf = events_path.to_path_buf(); - let events_worker = create_events_worker( + let (event_worker_metric, sender) = create_events_worker( events_path_buf, import_map_path.clone(), no_module_cache, @@ -199,13 +219,16 @@ impl Server { ) .await?; - worker_events_sender = Some(events_worker); - } + worker_events_tx = Some(sender); + Some(event_worker_metric) + } else { + None + }; // Create a user worker pool - let user_worker_msgs_tx = create_user_worker_pool( + let (shared_metric_src, worker_pool_tx) = create_user_worker_pool( maybe_user_worker_policy.unwrap_or_default(), - worker_events_sender, + worker_events_tx, None, ) .await?; @@ -216,7 +239,11 @@ impl Server { main_worker_path, import_map_path.clone(), no_module_cache, - user_worker_msgs_tx, + MainWorkerRuntimeOpts { + worker_pool_tx, + shared_metric_src: Some(shared_metric_src.clone()), + event_worker_metric_src, + }, maybe_main_entrypoint, Some(termination_token.child_token()), ) @@ -229,6 +256,7 @@ impl Server { main_worker_req_tx, callback_tx, termination_token, + metric_src: shared_metric_src, }) } @@ -253,6 +281,7 @@ impl Server { loop { let main_worker_req_tx = self.main_worker_req_tx.clone(); + let metric_src = self.metric_src.clone(); tokio::select! { msg = listener.accept() => { @@ -261,7 +290,7 @@ impl Server { tokio::task::spawn({ let event_tx = event_tx.clone(); async move { - let (service, cancel) = WorkerService::new(main_worker_req_tx); + let (service, cancel) = WorkerService::new(metric_src, main_worker_req_tx); let _guard = cancel.drop_guard(); let conn_fut = Http::new() diff --git a/crates/base/src/utils/integration_test_helper.rs b/crates/base/src/utils/integration_test_helper.rs index be7ea6bb1..cf7cdf0ac 100644 --- a/crates/base/src/utils/integration_test_helper.rs +++ b/crates/base/src/utils/integration_test_helper.rs @@ -192,7 +192,7 @@ impl TestBedBuilder { } pub async fn build(self) -> TestBed { - let (worker_pool_msg_tx, pool_termination_token) = { + let ((_, worker_pool_tx), pool_termination_token) = { let token = TerminationToken::new(); ( create_user_worker_pool( @@ -218,12 +218,14 @@ impl TestBedBuilder { maybe_entrypoint: None, maybe_module_code: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { - worker_pool_tx: worker_pool_msg_tx, + worker_pool_tx, + shared_metric_src: None, + event_worker_metric_src: None, }), }; let main_termination_token = TerminationToken::new(); - let main_worker_msg_tx = + let (_, main_worker_msg_tx) = create_worker((main_worker_init_opts, main_termination_token.clone())) .await .unwrap(); @@ -293,20 +295,24 @@ pub async fn create_test_user_worker>( ..Default::default() }); - Ok(( - create_worker( + Ok({ + let (_, sender) = create_worker( opts.with_policy(policy) .with_termination_token(termination_token.clone()), ) - .await?, - RequestScope { - policy, - req_start_tx, - req_end_tx, - termination_token, - conn: (Some(conn_tx), conn_rx), - }, - )) + .await?; + + ( + sender, + RequestScope { + policy, + req_start_tx, + req_end_tx, + termination_token, + conn: (Some(conn_tx), conn_rx), + }, + ) + }) } pub fn test_user_worker_pool_policy() -> WorkerPoolPolicy { diff --git a/crates/base/tests/linux_pku_sigsegv_test.rs b/crates/base/tests/linux_pku_sigsegv_test.rs index 2205b6554..df1abd626 100644 --- a/crates/base/tests/linux_pku_sigsegv_test.rs +++ b/crates/base/tests/linux_pku_sigsegv_test.rs @@ -21,7 +21,7 @@ async fn test_not_trigger_pku_sigsegv_due_to_jit_compilation_non_cli() { let main_termination_token = TerminationToken::new(); // create a user worker pool - let user_worker_msgs_tx = create_user_worker_pool( + let (_, worker_pool_tx) = create_user_worker_pool( integration_test_helper::test_user_worker_pool_policy(), None, Some(pool_termination_token.clone()), @@ -40,11 +40,13 @@ async fn test_not_trigger_pku_sigsegv_due_to_jit_compilation_non_cli() { maybe_entrypoint: None, maybe_module_code: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { - worker_pool_tx: user_worker_msgs_tx, + worker_pool_tx, + shared_metric_src: None, + event_worker_metric_src: None, }), }; - let worker_req_tx = create_worker((opts, main_termination_token.clone())) + let (_, worker_req_tx) = create_worker((opts, main_termination_token.clone())) .await .unwrap(); diff --git a/crates/base/tests/main_worker_tests.rs b/crates/base/tests/main_worker_tests.rs index ae2be30ac..a6035b3d8 100644 --- a/crates/base/tests/main_worker_tests.rs +++ b/crates/base/tests/main_worker_tests.rs @@ -111,7 +111,7 @@ async fn test_main_worker_boot_error() { let main_termination_token = TerminationToken::new(); // create a user worker pool - let user_worker_msgs_tx = create_user_worker_pool( + let (_, worker_pool_tx) = create_user_worker_pool( test_user_worker_pool_policy(), None, Some(pool_termination_token.clone()), @@ -130,7 +130,9 @@ async fn test_main_worker_boot_error() { maybe_entrypoint: None, maybe_module_code: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { - worker_pool_tx: user_worker_msgs_tx, + worker_pool_tx, + shared_metric_src: None, + event_worker_metric_src: None, }), }; diff --git a/crates/sb_core/Cargo.toml b/crates/sb_core/Cargo.toml index cb3d8d922..c0c94c9d9 100644 --- a/crates/sb_core/Cargo.toml +++ b/crates/sb_core/Cargo.toml @@ -47,4 +47,5 @@ encoding_rs = { version = "=0.8.33" } base64.workspace = true futures.workspace = true percent-encoding.workspace = true -scopeguard.workspace = true \ No newline at end of file +scopeguard.workspace = true +enum-as-inner.workspace = true \ No newline at end of file diff --git a/crates/sb_core/js/main_worker.js b/crates/sb_core/js/main_worker.js index 12b558663..a92da55fd 100644 --- a/crates/sb_core/js/main_worker.js +++ b/crates/sb_core/js/main_worker.js @@ -1,10 +1,14 @@ -import { SUPABASE_USER_WORKERS } from 'ext:sb_user_workers/user_workers.js'; -import { applyWatcherRid } from 'ext:sb_core_main_js/js/http.js'; +import { SUPABASE_USER_WORKERS } from "ext:sb_user_workers/user_workers.js"; +import { applyWatcherRid } from "ext:sb_core_main_js/js/http.js"; +import { core } from "ext:core/mod.js"; + +const ops = core.ops; Object.defineProperty(globalThis, 'EdgeRuntime', { get() { return { userWorkers: SUPABASE_USER_WORKERS, + getRuntimeMetrics: () => /* async */ ops.op_runtime_metrics(), applyConnectionWatcher: (src, dest) => { applyWatcherRid(src, dest); } diff --git a/crates/sb_core/lib.rs b/crates/sb_core/lib.rs index dfd76fb01..4bed74689 100644 --- a/crates/sb_core/lib.rs +++ b/crates/sb_core/lib.rs @@ -1,6 +1,18 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + use deno_core::error::AnyError; -use deno_core::op2; +use deno_core::v8::IsolateHandle; use deno_core::OpState; +use deno_core::{op2, JsRuntime}; +use enum_as_inner::EnumAsInner; +use futures::task::AtomicWaker; +use futures::FutureExt; +use log::error; +use serde::Serialize; +use tokio::sync::oneshot; pub mod auth_tokens; pub mod cache; @@ -17,6 +29,213 @@ pub mod runtime; pub mod transpiler; pub mod util; +#[derive(Debug, Default, Clone)] +pub struct SharedMetricSource { + active_user_workers: Arc, + retired_user_workers: Arc, + received_requests: Arc, + handled_requests: Arc, +} + +impl SharedMetricSource { + pub fn incl_active_user_workers(&self) { + self.active_user_workers.fetch_add(1, Ordering::Relaxed); + } + + pub fn decl_active_user_workers(&self) { + self.active_user_workers.fetch_sub(1, Ordering::Relaxed); + } + + pub fn incl_retired_user_worker(&self) { + self.retired_user_workers.fetch_add(1, Ordering::Relaxed); + } + + pub fn incl_received_requests(&self) { + self.received_requests.fetch_add(1, Ordering::Relaxed); + } + + pub fn incl_handled_requests(&self) { + self.handled_requests.fetch_add(1, Ordering::Relaxed); + } + + pub fn reset(&self) { + self.active_user_workers.store(0, Ordering::Relaxed); + self.retired_user_workers.store(0, Ordering::Relaxed); + self.received_requests.store(0, Ordering::Relaxed); + self.handled_requests.store(0, Ordering::Relaxed); + } +} + +#[derive(Debug, Clone, EnumAsInner)] +pub enum MetricSource { + Worker(WorkerMetricSource), + Runtime(RuntimeMetricSource), +} + +#[derive(Debug, Clone)] +pub struct WorkerMetricSource { + handle: IsolateHandle, + waker: Arc, +} + +impl From<&mut JsRuntime> for WorkerMetricSource { + fn from(value: &mut JsRuntime) -> Self { + Self::from_js_runtime(value) + } +} + +impl WorkerMetricSource { + pub fn from_js_runtime(runtime: &mut JsRuntime) -> Self { + let handle = runtime.v8_isolate().thread_safe_handle(); + let waker = { + let state = runtime.op_state(); + let state_mut = state.borrow_mut(); + + state_mut.waker.clone() + }; + + Self { handle, waker } + } +} + +#[derive(Debug, Clone)] +pub struct RuntimeMetricSource { + pub main: WorkerMetricSource, + pub event: Option, + pub shared: SharedMetricSource, +} + +impl RuntimeMetricSource { + pub fn new( + main: WorkerMetricSource, + maybe_event: Option, + maybe_shared: Option, + ) -> Self { + Self { + main, + event: maybe_event, + shared: maybe_shared.unwrap_or_default(), + } + } + + async fn get_heap_statistics(&mut self) -> RuntimeHeapStatistics { + #[repr(C)] + struct InterruptData { + heap_tx: oneshot::Sender, + } + + extern "C" fn interrupt_fn( + isolate: &mut deno_core::v8::Isolate, + data: *mut std::ffi::c_void, + ) { + let arg = unsafe { Box::::from_raw(data as *mut _) }; + let mut v8_heap_stats = deno_core::v8::HeapStatistics::default(); + let mut worker_heap_stats = WorkerHeapStatistics::default(); + + isolate.get_heap_statistics(&mut v8_heap_stats); + + worker_heap_stats.total_heap_size = v8_heap_stats.total_heap_size(); + worker_heap_stats.total_heap_executable = v8_heap_stats.total_heap_size_executable(); + worker_heap_stats.total_physical_size = v8_heap_stats.total_physical_size(); + worker_heap_stats.total_available_size = v8_heap_stats.total_available_size(); + worker_heap_stats.total_global_handles_size = v8_heap_stats.total_global_handles_size(); + worker_heap_stats.used_global_handles_size = v8_heap_stats.used_global_handles_size(); + worker_heap_stats.used_heap_size = v8_heap_stats.used_heap_size(); + worker_heap_stats.malloced_memory = v8_heap_stats.malloced_memory(); + worker_heap_stats.external_memory = v8_heap_stats.external_memory(); + worker_heap_stats.peak_malloced_memory = v8_heap_stats.peak_malloced_memory(); + + if let Err(err) = arg.heap_tx.send(worker_heap_stats) { + error!("failed to send worker heap statistics: {:?}", err); + } + } + + let request_heap_statistics_fn = |arg: Option<&mut WorkerMetricSource>| { + let Some(source) = arg else { + return async { None:: }.boxed(); + }; + + let (tx, rx) = oneshot::channel::(); + let data_ptr_mut = Box::into_raw(Box::new(InterruptData { heap_tx: tx })); + + if !source + .handle + .request_interrupt(interrupt_fn, data_ptr_mut as *mut std::ffi::c_void) + { + drop(unsafe { Box::from_raw(data_ptr_mut) }); + return async { None }.boxed(); + } + + let waker = source.waker.clone(); + + async move { + waker.wake(); + rx.await.ok() + } + .boxed() + }; + + RuntimeHeapStatistics { + main_worker_heap_stats: request_heap_statistics_fn(Some(&mut self.main)) + .await + .unwrap_or_default(), + + event_worker_heap_stats: request_heap_statistics_fn(self.event.as_mut()).await, + } + } +} + +#[derive(Debug, Serialize, Default)] +#[serde(rename_all = "camelCase")] +struct WorkerHeapStatistics { + total_heap_size: usize, + total_heap_executable: usize, + total_physical_size: usize, + total_available_size: usize, + total_global_handles_size: usize, + used_global_handles_size: usize, + used_heap_size: usize, + malloced_memory: usize, + external_memory: usize, + peak_malloced_memory: usize, +} + +#[derive(Debug, Serialize, Default)] +#[serde(rename_all = "camelCase")] +struct RuntimeHeapStatistics { + main_worker_heap_stats: WorkerHeapStatistics, + event_worker_heap_stats: Option, +} + +#[derive(Debug, Serialize, Default)] +#[serde(rename_all = "camelCase")] +struct RuntimeSharedStatistics { + active_user_workers_count: usize, + retired_user_workers_count: usize, + received_requests_count: usize, + handled_requests_count: usize, +} + +impl RuntimeSharedStatistics { + fn from_shared_metric_src(src: &SharedMetricSource) -> Self { + Self { + active_user_workers_count: src.active_user_workers.load(Ordering::Relaxed), + retired_user_workers_count: src.retired_user_workers.load(Ordering::Relaxed), + received_requests_count: src.received_requests.load(Ordering::Relaxed), + handled_requests_count: src.handled_requests.load(Ordering::Relaxed), + } + } +} + +#[derive(Debug, Serialize, Default)] +#[serde(rename_all = "camelCase")] +struct RuntimeMetrics { + #[serde(flatten)] + heap_stats: RuntimeHeapStatistics, + #[serde(flatten)] + shared_stats: RuntimeSharedStatistics, +} + #[op2(fast)] fn op_is_terminal(state: &mut OpState, rid: u32) -> Result { let handle = state.resource_table.get_handle(rid)?; @@ -33,6 +252,22 @@ fn op_console_size(_state: &mut OpState, #[buffer] _result: &mut [u32]) -> Resul Ok(()) } +#[op2(async)] +#[serde] +async fn op_runtime_metrics(state: Rc>) -> Result { + let mut runtime_metrics = RuntimeMetrics::default(); + let mut runtime_metric_src = { + let state = state.borrow(); + state.borrow::().clone() + }; + + runtime_metrics.heap_stats = runtime_metric_src.get_heap_statistics().await; + runtime_metrics.shared_stats = + RuntimeSharedStatistics::from_shared_metric_src(&runtime_metric_src.shared); + + Ok(runtime_metrics) +} + #[op2] #[string] pub fn op_read_line_prompt( @@ -54,7 +289,8 @@ deno_core::extension!( op_stdin_set_raw, op_console_size, op_read_line_prompt, - op_set_exit_code + op_set_exit_code, + op_runtime_metrics ], esm_entry_point = "ext:sb_core_main_js/js/bootstrap.js", esm = [ diff --git a/crates/sb_workers/context.rs b/crates/sb_workers/context.rs index f645650b4..371200f88 100644 --- a/crates/sb_workers/context.rs +++ b/crates/sb_workers/context.rs @@ -5,6 +5,7 @@ use event_worker::events::WorkerEventWithMetadata; use hyper::{Body, Request, Response}; use sb_core::conn_sync::ConnSync; use sb_core::util::sync::AtomicFlag; +use sb_core::{MetricSource, SharedMetricSource}; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::{collections::HashMap, sync::Arc}; @@ -75,6 +76,8 @@ pub struct UserWorkerProfile { #[derive(Debug, Clone)] pub struct MainWorkerRuntimeOpts { pub worker_pool_tx: mpsc::UnboundedSender, + pub shared_metric_src: Option, + pub event_worker_metric_src: Option, } #[derive(Debug, Clone)] @@ -87,6 +90,29 @@ pub enum WorkerRuntimeOpts { EventsWorker(EventWorkerRuntimeOpts), } +impl WorkerRuntimeOpts { + pub fn to_worker_kind(&self) -> WorkerKind { + match self { + Self::UserWorker(_) => WorkerKind::UserWorker, + Self::MainWorker(_) => WorkerKind::MainWorker, + Self::EventsWorker(_) => WorkerKind::EventsWorker, + } + } +} + +#[derive(Debug, Clone, Copy, EnumAsInner, PartialEq, Eq)] +pub enum WorkerKind { + UserWorker, + MainWorker, + EventsWorker, +} + +impl From<&WorkerRuntimeOpts> for WorkerKind { + fn from(value: &WorkerRuntimeOpts) -> Self { + value.to_worker_kind() + } +} + #[derive(Debug, Clone, Default)] pub struct TimingStatus { pub demand: Arc, diff --git a/examples/main/index.ts b/examples/main/index.ts index a12f2d12b..6d04130f8 100644 --- a/examples/main/index.ts +++ b/examples/main/index.ts @@ -14,6 +14,11 @@ serve(async (req: Request) => { ); } + if (pathname === '/_internal/metric') { + const metric = await EdgeRuntime.getRuntimeMetrics(); + return Response.json(metric); + } + const path_parts = pathname.split('/'); const service_name = path_parts[1];