Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZTC-1648: Avoid heap profiling crash by eagerly starting long-lived profiling thread #54

Merged
138 changes: 106 additions & 32 deletions foundations/src/telemetry/memory_profiler.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use super::settings::MemoryProfilerSettings;
use crate::utils::feature_use;
use crate::{BootstrapError, BootstrapResult, Result};
use anyhow::anyhow;
use anyhow::bail;
use once_cell::sync::{Lazy, OnceCell};
use std::ffi::{CStr, CString};
use std::fs::File;
use std::io::Read;
use std::os::raw::c_char;
use std::thread;
use std::sync::mpsc::{self};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use tempfile::NamedTempFile;
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::spawn_blocking;
use tokio::sync::{oneshot, Mutex as AsyncMutex};

feature_use!(cfg(feature = "security"), {
use crate::security::common_syscall_allow_lists::SERVICE_BASICS;
Expand Down Expand Up @@ -56,12 +59,12 @@ struct Seal;
/// A safe interface for [jemalloc]'s memory profiling functionality.
///
/// [jemalloc]: https://github.com/jemalloc/jemalloc
#[derive(Copy, Clone)]
#[derive(Clone)]
pub struct MemoryProfiler {
_seal: Seal,

#[cfg(feature = "security")]
sandbox_profiling_syscalls: bool,
request_heap_profile: mpsc::Sender<oneshot::Sender<anyhow::Result<String>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can now remove the Seal above as MemoryProfiler now always has one private field disregard the feature set and, thus, can't be constructed by the external code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thank you! Removed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use crate::Result instead. anyhow::Result aliased BootstrapResult is used only for errors that eventually can terminate the process and are quite heavy on CPU and memory as they also contain stack traces. For operational errors, like this one use crate::Result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

_heap_profiling_thread_handle: Arc<Mutex<JoinHandle<()>>>,
OmegaJak marked this conversation as resolved.
Show resolved Hide resolved
}

impl MemoryProfiler {
Expand All @@ -83,7 +86,7 @@ impl MemoryProfiler {

PROFILER
.get_or_try_init(|| init_profiler(settings))
.copied()
.cloned()
}

/// Returns a heap profile.
Expand Down Expand Up @@ -115,24 +118,10 @@ impl MemoryProfiler {
return Err("profiling is already in progress".into());
};

#[cfg(feature = "security")]
let sandbox_profiling_syscalls = self.sandbox_profiling_syscalls;
let (response_sender, response_receiver) = oneshot::channel();
self.request_heap_profile.send(response_sender)?;

let collector_thread = thread::spawn(move || {
#[cfg(feature = "security")]
if sandbox_profiling_syscalls {
sandbox_jemalloc_syscalls()?;
}

collect_heap_profile()
});

spawn_blocking(move || {
collector_thread
.join()
.map_err(|_| "heap profile collector thread panicked")?
})
.await?
Ok(response_receiver.await??)
}

/// Returns heap statistics.
Expand Down Expand Up @@ -169,6 +158,24 @@ fn init_profiler(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Me
return Ok(None);
}

let (request_sender, request_receiver) = mpsc::channel();

#[cfg(feature = "security")]
let (setup_complete_sender, setup_complete_receiver) = mpsc::channel();

let sandbox_profiling_syscalls = settings.sandbox_profiling_syscalls;
let heap_profile_thread_handle = spawn(move || {
heap_profile_thread(
request_receiver,
#[cfg(feature = "security")]
setup_complete_sender,
sandbox_profiling_syscalls,
)
});

#[cfg(feature = "security")]
receive_profiling_thread_setup_msg(setup_complete_receiver)?;

control::write(control::BACKGROUND_THREAD, true).map_err(|e| {
BootstrapError::new(e).context("failed to activate background thread collection")
})?;
Expand All @@ -182,26 +189,67 @@ fn init_profiler(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Me
Ok(Some(MemoryProfiler {
_seal: Seal,

#[cfg(feature = "security")]
sandbox_profiling_syscalls: settings.sandbox_profiling_syscalls,
request_heap_profile: request_sender,
_heap_profiling_thread_handle: Arc::new(Mutex::new(heap_profile_thread_handle)),
}))
}

fn collect_heap_profile() -> Result<String> {
#[cfg(feature = "security")]
fn receive_profiling_thread_setup_msg(
setup_complete_receiver: mpsc::Receiver<anyhow::Result<()>>,
) -> anyhow::Result<()> {
use std::sync::mpsc::RecvTimeoutError;

const PROFILING_THREAD_SETUP_TIMEOUT: Duration = Duration::from_secs(2);
match setup_complete_receiver.recv_timeout(PROFILING_THREAD_SETUP_TIMEOUT) {
Ok(Ok(())) => {}
Ok(Err(setup_err)) => {
return Err(setup_err);
}
Err(RecvTimeoutError::Timeout) => bail!("Profiling thread did not finish setup in time"),
Err(RecvTimeoutError::Disconnected) => {
bail!("Profiling thread disconnected before finishing setup")
OmegaJak marked this conversation as resolved.
Show resolved Hide resolved
}
}

Ok(())
}

fn heap_profile_thread(
receive_request: mpsc::Receiver<oneshot::Sender<anyhow::Result<String>>>,
#[cfg(feature = "security")] setup_complete: mpsc::Sender<anyhow::Result<()>>,
sandbox_profiling_syscalls: bool,
) {
#[cfg(feature = "security")]
if sandbox_profiling_syscalls {
if let Err(_) = setup_complete.send(sandbox_jemalloc_syscalls()) {
return;
OmegaJak marked this conversation as resolved.
Show resolved Hide resolved
}
}

while let Ok(send_response) = receive_request.recv() {
if let Err(_) = send_response.send(collect_heap_profile()) {
break;
}
}
}

fn collect_heap_profile() -> anyhow::Result<String> {
let out_file = NamedTempFile::new()?;

let out_file_path = out_file
.path()
.to_str()
.ok_or("failed to obtain heap profile output file path")?;
.ok_or(anyhow!("failed to obtain heap profile output file path"))?;

let mut out_file_path_c_str = CString::new(out_file_path)?.into_bytes_with_nul();
let out_file_path_ptr = out_file_path_c_str.as_mut_ptr() as *mut c_char;

control::write(control::PROF_DUMP, out_file_path_ptr).map_err(|e| {
format!(
anyhow!(
"failed to dump jemalloc heap profile to {:?}: {}",
out_file_path, e
out_file_path,
e
)
})?;

Expand All @@ -213,7 +261,7 @@ fn collect_heap_profile() -> Result<String> {
}

#[cfg(feature = "security")]
fn sandbox_jemalloc_syscalls() -> Result<()> {
fn sandbox_jemalloc_syscalls() -> anyhow::Result<()> {
#[cfg(target_arch = "x86_64")]
allow_list! {
static ALLOWED_SYSCALLS = [
Expand Down Expand Up @@ -244,7 +292,9 @@ fn sandbox_jemalloc_syscalls() -> Result<()> {
#[cfg(test)]
mod tests {
use super::*;
use crate::telemetry::settings::MemoryProfilerSettings;
use crate::{
security::common_syscall_allow_lists::ASYNC, telemetry::settings::MemoryProfilerSettings,
};

#[test]
fn sample_interval_out_of_bounds() {
Expand All @@ -256,6 +306,30 @@ mod tests {
.is_err());
}

#[tokio::test]
async fn profile_heap_with_profiling_sandboxed_after_previous_seccomp_init() {
OmegaJak marked this conversation as resolved.
Show resolved Hide resolved
let profiler = MemoryProfiler::get_or_init_with(&MemoryProfilerSettings {
Copy link
Collaborator

@inikulin inikulin Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this will still fail as MemoryProfiler will usually be initialised in production on a first request to telemetry server for a heap profile and that happens when the whole app is spun up and seccomp is initialised on the main thread.

It seems the way to solve that is to start the profiling thread in telemetry::init that is recommended to be called before seccomp init on the main thread (though, it's not reflected in docs, but shown in the example - probably we should update the docs here).

Task sender for the thread will be stored in a global var (like the profiler itself), so once we have profiler initialized, it can tell the thread to collect a profile via a global sender. Additionally, we can put the sender under a mutex, this way we can remove PROFILING_IN_PROGRESS_LOCK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my changes, the MemoryProfiler is initialized when telemetry::server::init is called, which is called by telemetry::init. I'll make sure the recommendation to call telemetry::init before seccomp setup is in the docs.

Regarding the locking -- if the profiler itself is stored in a global variable, and the only way to get the profiler is through the global variable, I'm not sure I see the point of pulling the sender into its own global variable rather than letting it continue living inside the profiler.

As far as the PROFILING_IN_PROGRESS_LOCK goes, looking at it now, I actually don't think we need that any more. If multiple requests come in at the same time, they'll simply queue up and each be processed in turn by the profiling thread's loop. I'll look into removing that altogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added docs and removed the PROFILING_IN_PROGRESS_LOCK. As a sanity check, I used apache bench to hit a local version of a service using this branch, making 10k heap profile requests with a concurrency level of 100. High concurrency meant high response times of course, but there were no failures.

Apache Bench Output
$ ab -n 10000 -c 100 -l localhost:7800/pprof/heap
This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests


Server Software:        
Server Hostname:        localhost
Server Port:            7800

Document Path:          /pprof/heap
Document Length:        Variable

Concurrency Level:      100
Time taken for tests:   1.236 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      88918063 bytes
HTML transferred:       87658063 bytes
Requests per second:    8091.14 [#/sec] (mean)
Time per request:       12.359 [ms] (mean)
Time per request:       0.124 [ms] (mean, across all concurrent requests)
Transfer rate:          70258.63 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       1
Processing:     1   12   3.1     11      20
Waiting:        1   12   3.1     11      20
Total:          2   12   3.1     11      20

Percentage of the requests served within a certain time (ms)
  50%     11
  66%     13
  75%     14
  80%     15
  90%     17
  95%     19
  98%     19
  99%     19
 100%     20 (longest request)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though having multiple profiles can be fine in artificial benchmarks, we don't want to run multiple profiles at a time on a heavily loaded server as it: 1) can have effect on the performance of the server; 2) due to 1 can produce skewed results for other profiles, e.g. timing discrepancy can cause pacing issues and introduce additional allocations; 3) due to the way how our profiling pipeline is built (i.e. profiles are available globally once they collected) we want to avoid doing the same job multiple times and just allow others grab a profile that was already collected by someone else at the time period when they requested theirs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my changes, the MemoryProfiler is initialized when telemetry::server::init is called,

I see. Let's also add docs for those who might use the profiler programmatically outside of telemetry server that if they have seccomp enabled it's recommended to init the profiler before that.

Copy link
Contributor Author

@OmegaJak OmegaJak Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want to run multiple profiles at a time on a heavily loaded server

We don't run multiple profiles at a time. Requests enter into a queue which the profiling thread processes one at a time.

due to the way how our profiling pipeline is built (i.e. profiles are available globally once they collected) we want to avoid doing the same job multiple times and just allow others grab a profile that was already collected by someone else at the time period when they requested theirs

Sure, we could do something like "if we receive profiling request A, and then request B comes in while we're still gathering A's, then return to B what we returned to A". But, that seems like a premature optimization with unneeded complexity to me.

If you're opposed to the queuing functionality and want to match how it worked before my changes, we could avoid it by reintroducing a lock/mutex around the sender and returning an error if we can't immediately acquire the lock. While that does avoid possible server overload due to profiling requests with a sort of rate limiting of "only one at a time", it feels like an unnecessary limiting of the server's functionality. If we do that, and user B requests a profile while user A's is still running, user B will just get an error and have to try again, instead of (as it currently is) getting their profile result a few milliseconds later when A's is done.

Copy link
Collaborator

@inikulin inikulin Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't run multiple profiles at a time. Requests enter into a queue which the profiling thread processes one at a time.

Don't really want to allow queuing profiles: we either need to have a sophisticated way to control and monitor the queue (e.g. metrics for queue, duration, a way to cancel the queue, etc.) or just keep the things simple as they were.

Some of the applications are extremely loaded and hypersensitive to performance changes and at the same time anyone really can request a profile. And I imagine people erroneously queuing profiles multiple times. And the only way we can abort such a queue is only via a service restart.

Then, also, if people request profiles, they probably run a certain experiment: making requests/connections, etc. So, the timing is important, running profile without a timing guarantee of when it will be run is not very useful. To make the matter worse, we don't even give any feedback signal to the requester whether profile will be run immediately or queued.

So, tl;dr, yes, let's re-introduce the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock re-introduced.

enabled: true,
sandbox_profiling_syscalls: true,
..Default::default()
})
.unwrap()
.unwrap_or_else(|| {
panic!("profiling should be enabled via `_RJEM_MALLOC_CONF=prof:true` env var");
});

allow_list! {
static ALLOW_PROFILING = [
..SERVICE_BASICS,
..ASYNC
]
}
enable_syscall_sandboxing(ViolationAction::KillProcess, &ALLOW_PROFILING).unwrap();

let profile = profiler.heap_profile().await.unwrap();
assert!(!profile.is_empty());
}

// NOTE: `heap_profile` uses raw pointers, the test ensures that it doesn't affect the returned future
fn _assert_heap_profile_fut_is_send() {
fn is_send<T: Send>(_t: T) {}
Expand Down
8 changes: 7 additions & 1 deletion foundations/src/telemetry/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(feature = "metrics")]
use super::metrics;
use super::settings::TelemetrySettings;
use crate::BootstrapError;
use crate::{BootstrapResult, Result};
use anyhow::{anyhow, Context as _};
use futures_util::future::BoxFuture;
Expand Down Expand Up @@ -47,6 +48,11 @@ pub(super) fn init(
}

let settings = Arc::new(settings);

// Eagerly init the memory profiler so it gets set up before syscalls are sandboxed with seccomp.
// Otherwise, the profiler may invoke seccomp for itself and violate prior seccomp configuration.
memory_profiling::profiler(Arc::clone(&settings)).map_err(|err| anyhow!(err))?;

let router = create_router(&settings, custom_routes)?;
let addr = settings.server.addr;

Expand Down Expand Up @@ -162,7 +168,7 @@ mod memory_profiling {
use super::*;
use crate::telemetry::MemoryProfiler;

fn profiler(settings: Arc<TelemetrySettings>) -> Result<MemoryProfiler> {
pub(super) fn profiler(settings: Arc<TelemetrySettings>) -> Result<MemoryProfiler> {
MemoryProfiler::get_or_init_with(&settings.memory_profiler)?.ok_or_else(|| {
"profiling should be enabled via `_RJEM_MALLOC_CONF=prof:true` env var".into()
})
Expand Down