-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ZTC-1648: Avoid heap profiling crash by eagerly starting long-lived profiling thread #54
Changes from 1 commit
3977538
634ff4d
4e38e56
e5a25ed
a94d8f4
1757ced
3abd233
19c3933
e446b46
e852a05
8891073
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,19 @@ | ||
use super::settings::MemoryProfilerSettings; | ||
use crate::utils::feature_use; | ||
use crate::{BootstrapError, BootstrapResult, Result}; | ||
use anyhow::anyhow; | ||
use anyhow::bail; | ||
use once_cell::sync::{Lazy, OnceCell}; | ||
use std::ffi::{CStr, CString}; | ||
use std::fs::File; | ||
use std::io::Read; | ||
use std::os::raw::c_char; | ||
use std::thread; | ||
use std::sync::mpsc::{self}; | ||
use std::sync::{Arc, Mutex}; | ||
use std::thread::{spawn, JoinHandle}; | ||
use std::time::Duration; | ||
use tempfile::NamedTempFile; | ||
use tokio::sync::Mutex as AsyncMutex; | ||
use tokio::task::spawn_blocking; | ||
use tokio::sync::{oneshot, Mutex as AsyncMutex}; | ||
|
||
feature_use!(cfg(feature = "security"), { | ||
use crate::security::common_syscall_allow_lists::SERVICE_BASICS; | ||
|
@@ -56,12 +59,12 @@ struct Seal; | |
/// A safe interface for [jemalloc]'s memory profiling functionality. | ||
/// | ||
/// [jemalloc]: https://github.com/jemalloc/jemalloc | ||
#[derive(Copy, Clone)] | ||
#[derive(Clone)] | ||
pub struct MemoryProfiler { | ||
_seal: Seal, | ||
|
||
#[cfg(feature = "security")] | ||
sandbox_profiling_syscalls: bool, | ||
request_heap_profile: mpsc::Sender<oneshot::Sender<anyhow::Result<String>>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
_heap_profiling_thread_handle: Arc<Mutex<JoinHandle<()>>>, | ||
OmegaJak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
impl MemoryProfiler { | ||
|
@@ -83,7 +86,7 @@ impl MemoryProfiler { | |
|
||
PROFILER | ||
.get_or_try_init(|| init_profiler(settings)) | ||
.copied() | ||
.cloned() | ||
} | ||
|
||
/// Returns a heap profile. | ||
|
@@ -115,24 +118,10 @@ impl MemoryProfiler { | |
return Err("profiling is already in progress".into()); | ||
}; | ||
|
||
#[cfg(feature = "security")] | ||
let sandbox_profiling_syscalls = self.sandbox_profiling_syscalls; | ||
let (response_sender, response_receiver) = oneshot::channel(); | ||
self.request_heap_profile.send(response_sender)?; | ||
|
||
let collector_thread = thread::spawn(move || { | ||
#[cfg(feature = "security")] | ||
if sandbox_profiling_syscalls { | ||
sandbox_jemalloc_syscalls()?; | ||
} | ||
|
||
collect_heap_profile() | ||
}); | ||
|
||
spawn_blocking(move || { | ||
collector_thread | ||
.join() | ||
.map_err(|_| "heap profile collector thread panicked")? | ||
}) | ||
.await? | ||
Ok(response_receiver.await??) | ||
} | ||
|
||
/// Returns heap statistics. | ||
|
@@ -169,6 +158,24 @@ fn init_profiler(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Me | |
return Ok(None); | ||
} | ||
|
||
let (request_sender, request_receiver) = mpsc::channel(); | ||
|
||
#[cfg(feature = "security")] | ||
let (setup_complete_sender, setup_complete_receiver) = mpsc::channel(); | ||
|
||
let sandbox_profiling_syscalls = settings.sandbox_profiling_syscalls; | ||
let heap_profile_thread_handle = spawn(move || { | ||
heap_profile_thread( | ||
request_receiver, | ||
#[cfg(feature = "security")] | ||
setup_complete_sender, | ||
sandbox_profiling_syscalls, | ||
) | ||
}); | ||
|
||
#[cfg(feature = "security")] | ||
receive_profiling_thread_setup_msg(setup_complete_receiver)?; | ||
|
||
control::write(control::BACKGROUND_THREAD, true).map_err(|e| { | ||
BootstrapError::new(e).context("failed to activate background thread collection") | ||
})?; | ||
|
@@ -182,26 +189,67 @@ fn init_profiler(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Me | |
Ok(Some(MemoryProfiler { | ||
_seal: Seal, | ||
|
||
#[cfg(feature = "security")] | ||
sandbox_profiling_syscalls: settings.sandbox_profiling_syscalls, | ||
request_heap_profile: request_sender, | ||
_heap_profiling_thread_handle: Arc::new(Mutex::new(heap_profile_thread_handle)), | ||
})) | ||
} | ||
|
||
fn collect_heap_profile() -> Result<String> { | ||
#[cfg(feature = "security")] | ||
fn receive_profiling_thread_setup_msg( | ||
setup_complete_receiver: mpsc::Receiver<anyhow::Result<()>>, | ||
) -> anyhow::Result<()> { | ||
use std::sync::mpsc::RecvTimeoutError; | ||
|
||
const PROFILING_THREAD_SETUP_TIMEOUT: Duration = Duration::from_secs(2); | ||
match setup_complete_receiver.recv_timeout(PROFILING_THREAD_SETUP_TIMEOUT) { | ||
Ok(Ok(())) => {} | ||
Ok(Err(setup_err)) => { | ||
return Err(setup_err); | ||
} | ||
Err(RecvTimeoutError::Timeout) => bail!("Profiling thread did not finish setup in time"), | ||
Err(RecvTimeoutError::Disconnected) => { | ||
bail!("Profiling thread disconnected before finishing setup") | ||
OmegaJak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
fn heap_profile_thread( | ||
receive_request: mpsc::Receiver<oneshot::Sender<anyhow::Result<String>>>, | ||
#[cfg(feature = "security")] setup_complete: mpsc::Sender<anyhow::Result<()>>, | ||
sandbox_profiling_syscalls: bool, | ||
) { | ||
#[cfg(feature = "security")] | ||
if sandbox_profiling_syscalls { | ||
if let Err(_) = setup_complete.send(sandbox_jemalloc_syscalls()) { | ||
return; | ||
OmegaJak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
while let Ok(send_response) = receive_request.recv() { | ||
if let Err(_) = send_response.send(collect_heap_profile()) { | ||
break; | ||
} | ||
} | ||
} | ||
|
||
fn collect_heap_profile() -> anyhow::Result<String> { | ||
let out_file = NamedTempFile::new()?; | ||
|
||
let out_file_path = out_file | ||
.path() | ||
.to_str() | ||
.ok_or("failed to obtain heap profile output file path")?; | ||
.ok_or(anyhow!("failed to obtain heap profile output file path"))?; | ||
|
||
let mut out_file_path_c_str = CString::new(out_file_path)?.into_bytes_with_nul(); | ||
let out_file_path_ptr = out_file_path_c_str.as_mut_ptr() as *mut c_char; | ||
|
||
control::write(control::PROF_DUMP, out_file_path_ptr).map_err(|e| { | ||
format!( | ||
anyhow!( | ||
"failed to dump jemalloc heap profile to {:?}: {}", | ||
out_file_path, e | ||
out_file_path, | ||
e | ||
) | ||
})?; | ||
|
||
|
@@ -213,7 +261,7 @@ fn collect_heap_profile() -> Result<String> { | |
} | ||
|
||
#[cfg(feature = "security")] | ||
fn sandbox_jemalloc_syscalls() -> Result<()> { | ||
fn sandbox_jemalloc_syscalls() -> anyhow::Result<()> { | ||
#[cfg(target_arch = "x86_64")] | ||
allow_list! { | ||
static ALLOWED_SYSCALLS = [ | ||
|
@@ -244,7 +292,9 @@ fn sandbox_jemalloc_syscalls() -> Result<()> { | |
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::telemetry::settings::MemoryProfilerSettings; | ||
use crate::{ | ||
security::common_syscall_allow_lists::ASYNC, telemetry::settings::MemoryProfilerSettings, | ||
}; | ||
|
||
#[test] | ||
fn sample_interval_out_of_bounds() { | ||
|
@@ -256,6 +306,30 @@ mod tests { | |
.is_err()); | ||
} | ||
|
||
#[tokio::test] | ||
async fn profile_heap_with_profiling_sandboxed_after_previous_seccomp_init() { | ||
OmegaJak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let profiler = MemoryProfiler::get_or_init_with(&MemoryProfilerSettings { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I can tell this will still fail as It seems the way to solve that is to start the profiling thread in Task sender for the thread will be stored in a global var (like the profiler itself), so once we have profiler initialized, it can tell the thread to collect a profile via a global sender. Additionally, we can put the sender under a mutex, this way we can remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With my changes, the Regarding the locking -- if the profiler itself is stored in a global variable, and the only way to get the profiler is through the global variable, I'm not sure I see the point of pulling the sender into its own global variable rather than letting it continue living inside the profiler. As far as the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added docs and removed the Apache Bench Output
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even though having multiple profiles can be fine in artificial benchmarks, we don't want to run multiple profiles at a time on a heavily loaded server as it: 1) can have effect on the performance of the server; 2) due to 1 can produce skewed results for other profiles, e.g. timing discrepancy can cause pacing issues and introduce additional allocations; 3) due to the way how our profiling pipeline is built (i.e. profiles are available globally once they collected) we want to avoid doing the same job multiple times and just allow others grab a profile that was already collected by someone else at the time period when they requested theirs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see. Let's also add docs for those who might use the profiler programmatically outside of telemetry server that if they have seccomp enabled it's recommended to init the profiler before that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We don't run multiple profiles at a time. Requests enter into a queue which the profiling thread processes one at a time.
Sure, we could do something like "if we receive profiling request A, and then request B comes in while we're still gathering A's, then return to B what we returned to A". But, that seems like a premature optimization with unneeded complexity to me. If you're opposed to the queuing functionality and want to match how it worked before my changes, we could avoid it by reintroducing a lock/mutex around the sender and returning an error if we can't immediately acquire the lock. While that does avoid possible server overload due to profiling requests with a sort of rate limiting of "only one at a time", it feels like an unnecessary limiting of the server's functionality. If we do that, and user B requests a profile while user A's is still running, user B will just get an error and have to try again, instead of (as it currently is) getting their profile result a few milliseconds later when A's is done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Don't really want to allow queuing profiles: we either need to have a sophisticated way to control and monitor the queue (e.g. metrics for queue, duration, a way to cancel the queue, etc.) or just keep the things simple as they were. Some of the applications are extremely loaded and hypersensitive to performance changes and at the same time anyone really can request a profile. And I imagine people erroneously queuing profiles multiple times. And the only way we can abort such a queue is only via a service restart. Then, also, if people request profiles, they probably run a certain experiment: making requests/connections, etc. So, the timing is important, running profile without a timing guarantee of when it will be run is not very useful. To make the matter worse, we don't even give any feedback signal to the requester whether profile will be run immediately or queued. So, tl;dr, yes, let's re-introduce the lock. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lock re-introduced. |
||
enabled: true, | ||
sandbox_profiling_syscalls: true, | ||
..Default::default() | ||
}) | ||
.unwrap() | ||
.unwrap_or_else(|| { | ||
panic!("profiling should be enabled via `_RJEM_MALLOC_CONF=prof:true` env var"); | ||
}); | ||
|
||
allow_list! { | ||
static ALLOW_PROFILING = [ | ||
..SERVICE_BASICS, | ||
..ASYNC | ||
] | ||
} | ||
enable_syscall_sandboxing(ViolationAction::KillProcess, &ALLOW_PROFILING).unwrap(); | ||
|
||
let profile = profiler.heap_profile().await.unwrap(); | ||
assert!(!profile.is_empty()); | ||
} | ||
|
||
// NOTE: `heap_profile` uses raw pointers, the test ensures that it doesn't affect the returned future | ||
fn _assert_heap_profile_fut_is_send() { | ||
fn is_send<T: Send>(_t: T) {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can now remove the
Seal
above asMemoryProfiler
now always has one private field disregard the feature set and, thus, can't be constructed by the external codeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, thank you! Removed.