Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZTC-1648: Avoid heap profiling crash by eagerly starting long-lived profiling thread #54

Merged
136 changes: 58 additions & 78 deletions foundations/src/telemetry/memory_profiler.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
use super::settings::MemoryProfilerSettings;
use crate::utils::feature_use;
use crate::{BootstrapError, BootstrapResult, Result};
use anyhow::bail;
use once_cell::sync::{Lazy, OnceCell};
use once_cell::sync::OnceCell;
use std::ffi::{CStr, CString};
use std::fs::File;
use std::io::Read;
use std::os::raw::c_char;
use std::thread;
use std::sync::mpsc::{self};
use tempfile::NamedTempFile;
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::spawn_blocking;

feature_use!(cfg(feature = "security"), {
use crate::security::common_syscall_allow_lists::SERVICE_BASICS;
use crate::security::{allow_list, enable_syscall_sandboxing, ViolationAction};
});
use tokio::sync::oneshot;
OmegaJak marked this conversation as resolved.
Show resolved Hide resolved

static PROFILER: OnceCell<Option<MemoryProfiler>> = OnceCell::new();
static PROFILING_IN_PROGRESS_LOCK: Lazy<AsyncMutex<()>> = Lazy::new(Default::default);

mod control {
use super::*;
Expand Down Expand Up @@ -49,19 +41,12 @@ mod control {
}
}

// NOTE: prevent direct construction by the external code.
#[derive(Copy, Clone)]
struct Seal;

/// A safe interface for [jemalloc]'s memory profiling functionality.
///
/// [jemalloc]: https://github.com/jemalloc/jemalloc
#[derive(Copy, Clone)]
#[derive(Clone)]
pub struct MemoryProfiler {
_seal: Seal,

#[cfg(feature = "security")]
sandbox_profiling_syscalls: bool,
request_heap_profile: mpsc::Sender<oneshot::Sender<Result<String>>>,
}

impl MemoryProfiler {
Expand All @@ -73,6 +58,9 @@ impl MemoryProfiler {
/// Note that profiling needs to be explicitly enabled by setting `_RJEM_MALLOC_CONF=prof:true`
/// environment variable for the binary and with [`MemoryProfilerSettings::enabled`] being set
/// to `true`. Otherwise, this method will return `None`.
///
/// If syscall sandboxing is being used (see [`crate::security`] for more details), telemetry
/// must be initialized prior to syscall sandboxing.
pub fn get_or_init_with(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Self>> {
const MAX_SAMPLE_INTERVAL: u8 = 64;

Expand All @@ -83,7 +71,7 @@ impl MemoryProfiler {

PROFILER
.get_or_try_init(|| init_profiler(settings))
.copied()
.cloned()
}

/// Returns a heap profile.
Expand All @@ -110,29 +98,10 @@ impl MemoryProfiler {
/// }
/// ```
pub async fn heap_profile(&self) -> Result<String> {
// NOTE: we use tokio mutex here, so we can hold the lock across `await` points.
let Ok(_lock) = PROFILING_IN_PROGRESS_LOCK.try_lock() else {
return Err("profiling is already in progress".into());
};

#[cfg(feature = "security")]
let sandbox_profiling_syscalls = self.sandbox_profiling_syscalls;
let (response_sender, response_receiver) = oneshot::channel();
self.request_heap_profile.send(response_sender)?;

let collector_thread = thread::spawn(move || {
#[cfg(feature = "security")]
if sandbox_profiling_syscalls {
sandbox_jemalloc_syscalls()?;
}

collect_heap_profile()
});

spawn_blocking(move || {
collector_thread
.join()
.map_err(|_| "heap profile collector thread panicked")?
})
.await?
response_receiver.await?
}

/// Returns heap statistics.
Expand Down Expand Up @@ -169,6 +138,10 @@ fn init_profiler(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Me
return Ok(None);
}

let (request_sender, request_receiver) = mpsc::channel();

std::thread::spawn(move || heap_profile_thread(request_receiver));

control::write(control::BACKGROUND_THREAD, true).map_err(|e| {
BootstrapError::new(e).context("failed to activate background thread collection")
})?;
Expand All @@ -180,13 +153,20 @@ fn init_profiler(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Me
.map_err(|e| BootstrapError::new(e).context("failed to activate profiling"))?;

Ok(Some(MemoryProfiler {
_seal: Seal,

#[cfg(feature = "security")]
sandbox_profiling_syscalls: settings.sandbox_profiling_syscalls,
request_heap_profile: request_sender,
}))
}

fn heap_profile_thread(receive_request: mpsc::Receiver<oneshot::Sender<Result<String>>>) {
while let Ok(send_response) = receive_request.recv() {
if send_response.send(collect_heap_profile()).is_err() {
// A failure to send indicates the main thread's receiver is gone, so something else
// has already gone wrong there.
return;
}
}
}

fn collect_heap_profile() -> Result<String> {
let out_file = NamedTempFile::new()?;

Expand All @@ -212,50 +192,50 @@ fn collect_heap_profile() -> Result<String> {
Ok(String::from_utf8(profile)?)
}

#[cfg(feature = "security")]
fn sandbox_jemalloc_syscalls() -> Result<()> {
#[cfg(target_arch = "x86_64")]
allow_list! {
static ALLOWED_SYSCALLS = [
..SERVICE_BASICS,
// PXY-41: Required to call Instant::now from parking-lot.
clock_gettime,
openat,
creat,
unlink
]
}

#[cfg(target_arch = "aarch64")]
allow_list! {
static ALLOWED_SYSCALLS = [
..SERVICE_BASICS,
clock_gettime,
openat,
unlinkat
]
}

enable_syscall_sandboxing(ViolationAction::KillProcess, &ALLOWED_SYSCALLS)?;

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use crate::telemetry::settings::MemoryProfilerSettings;
use crate::{
security::{
allow_list,
common_syscall_allow_lists::{ASYNC, SERVICE_BASICS},
enable_syscall_sandboxing, ViolationAction,
},
telemetry::settings::MemoryProfilerSettings,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: don't use nested imports. Instead, introduce a separate use for each non-leaf path, e.g.:

use crate::security::{allow_list, enable_syscall_sandboxing, ViolationAction};
use crate::security::common_syscall_allow_lists::{ASYNC, SERVICE_BASICS};
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah the joys of auto importing. Too bad rustfmt's imports_granularity = "Module" config isn't stabilized yet

};

#[test]
fn sample_interval_out_of_bounds() {
assert!(MemoryProfiler::get_or_init_with(&MemoryProfilerSettings {
enabled: true,
sample_interval: 128,
..Default::default()
})
.is_err());
}

#[tokio::test]
async fn profile_heap_after_seccomp_initialized() {
let profiler = MemoryProfiler::get_or_init_with(&MemoryProfilerSettings {
Copy link
Collaborator

@inikulin inikulin Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this will still fail as MemoryProfiler will usually be initialised in production on a first request to telemetry server for a heap profile and that happens when the whole app is spun up and seccomp is initialised on the main thread.

It seems the way to solve that is to start the profiling thread in telemetry::init that is recommended to be called before seccomp init on the main thread (though, it's not reflected in docs, but shown in the example - probably we should update the docs here).

Task sender for the thread will be stored in a global var (like the profiler itself), so once we have profiler initialized, it can tell the thread to collect a profile via a global sender. Additionally, we can put the sender under a mutex, this way we can remove PROFILING_IN_PROGRESS_LOCK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my changes, the MemoryProfiler is initialized when telemetry::server::init is called, which is called by telemetry::init. I'll make sure the recommendation to call telemetry::init before seccomp setup is in the docs.

Regarding the locking -- if the profiler itself is stored in a global variable, and the only way to get the profiler is through the global variable, I'm not sure I see the point of pulling the sender into its own global variable rather than letting it continue living inside the profiler.

As far as the PROFILING_IN_PROGRESS_LOCK goes, looking at it now, I actually don't think we need that any more. If multiple requests come in at the same time, they'll simply queue up and each be processed in turn by the profiling thread's loop. I'll look into removing that altogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added docs and removed the PROFILING_IN_PROGRESS_LOCK. As a sanity check, I used apache bench to hit a local version of a service using this branch, making 10k heap profile requests with a concurrency level of 100. High concurrency meant high response times of course, but there were no failures.

Apache Bench Output
$ ab -n 10000 -c 100 -l localhost:7800/pprof/heap
This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests


Server Software:        
Server Hostname:        localhost
Server Port:            7800

Document Path:          /pprof/heap
Document Length:        Variable

Concurrency Level:      100
Time taken for tests:   1.236 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      88918063 bytes
HTML transferred:       87658063 bytes
Requests per second:    8091.14 [#/sec] (mean)
Time per request:       12.359 [ms] (mean)
Time per request:       0.124 [ms] (mean, across all concurrent requests)
Transfer rate:          70258.63 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       1
Processing:     1   12   3.1     11      20
Waiting:        1   12   3.1     11      20
Total:          2   12   3.1     11      20

Percentage of the requests served within a certain time (ms)
  50%     11
  66%     13
  75%     14
  80%     15
  90%     17
  95%     19
  98%     19
  99%     19
 100%     20 (longest request)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though having multiple profiles can be fine in artificial benchmarks, we don't want to run multiple profiles at a time on a heavily loaded server as it: 1) can have effect on the performance of the server; 2) due to 1 can produce skewed results for other profiles, e.g. timing discrepancy can cause pacing issues and introduce additional allocations; 3) due to the way how our profiling pipeline is built (i.e. profiles are available globally once they collected) we want to avoid doing the same job multiple times and just allow others grab a profile that was already collected by someone else at the time period when they requested theirs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my changes, the MemoryProfiler is initialized when telemetry::server::init is called,

I see. Let's also add docs for those who might use the profiler programmatically outside of telemetry server that if they have seccomp enabled it's recommended to init the profiler before that.

Copy link
Contributor Author

@OmegaJak OmegaJak Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want to run multiple profiles at a time on a heavily loaded server

We don't run multiple profiles at a time. Requests enter into a queue which the profiling thread processes one at a time.

due to the way how our profiling pipeline is built (i.e. profiles are available globally once they collected) we want to avoid doing the same job multiple times and just allow others grab a profile that was already collected by someone else at the time period when they requested theirs

Sure, we could do something like "if we receive profiling request A, and then request B comes in while we're still gathering A's, then return to B what we returned to A". But, that seems like a premature optimization with unneeded complexity to me.

If you're opposed to the queuing functionality and want to match how it worked before my changes, we could avoid it by reintroducing a lock/mutex around the sender and returning an error if we can't immediately acquire the lock. While that does avoid possible server overload due to profiling requests with a sort of rate limiting of "only one at a time", it feels like an unnecessary limiting of the server's functionality. If we do that, and user B requests a profile while user A's is still running, user B will just get an error and have to try again, instead of (as it currently is) getting their profile result a few milliseconds later when A's is done.

Copy link
Collaborator

@inikulin inikulin Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't run multiple profiles at a time. Requests enter into a queue which the profiling thread processes one at a time.

Don't really want to allow queuing profiles: we either need to have a sophisticated way to control and monitor the queue (e.g. metrics for queue, duration, a way to cancel the queue, etc.) or just keep the things simple as they were.

Some of the applications are extremely loaded and hypersensitive to performance changes and at the same time anyone really can request a profile. And I imagine people erroneously queuing profiles multiple times. And the only way we can abort such a queue is only via a service restart.

Then, also, if people request profiles, they probably run a certain experiment: making requests/connections, etc. So, the timing is important, running profile without a timing guarantee of when it will be run is not very useful. To make the matter worse, we don't even give any feedback signal to the requester whether profile will be run immediately or queued.

So, tl;dr, yes, let's re-introduce the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock re-introduced.

enabled: true,
..Default::default()
})
.unwrap()
.unwrap_or_else(|| {
panic!("profiling should be enabled via `_RJEM_MALLOC_CONF=prof:true` env var");
});

allow_list! {
static ALLOW_PROFILING = [
..SERVICE_BASICS,
..ASYNC
]
}
enable_syscall_sandboxing(ViolationAction::KillProcess, &ALLOW_PROFILING).unwrap();

let profile = profiler.heap_profile().await.unwrap();
assert!(!profile.is_empty());
}

// NOTE: `heap_profile` uses raw pointers, the test ensures that it doesn't affect the returned future
fn _assert_heap_profile_fut_is_send() {
fn is_send<T: Send>(_t: T) {}
Expand Down
7 changes: 6 additions & 1 deletion foundations/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
//! begining of the `main` function) with the [`init`] function for it to be collected by the
//! external sinks.
//!
//! If syscall sandboxing is also being used (see [`crate::security`] for more details), telemetry
//! must be initialized prior to syscall sandboxing, since it uses syscalls during initialization
//! that it will not use later.
//!
//! # Telemetry context
//!
//! Foundations' telemetry is designed to not interfere with the production code, so you usually don't
Expand Down Expand Up @@ -688,7 +692,7 @@ pub struct TelemetryConfig<'c> {
/// doesn't need to be called in tests and any specified settings will be ignored in test
/// environments. Instead, all the telemetry will be collected in the [`TestTelemetryContext`].
///
/// The function should be called once on service initialization. Consequent calls to the function
/// The function should be called once on service initialization (prior to any [syscall sandboxing]). Consequent calls to the function
/// don't have any effect.
///
/// # Telemetry server
Expand All @@ -707,6 +711,7 @@ pub struct TelemetryConfig<'c> {
/// [Prometheus text format]: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
/// [jemalloc]: https://github.com/jemalloc/jemalloc
/// [`TelemetryServerSettings::enabled`]: `crate::telemetry::settings::TelemetryServerSettings::enabled`
/// [syscall sandboxing]: `crate::security`
pub fn init(config: TelemetryConfig) -> BootstrapResult<TelemetryDriver> {
let tele_futures: FuturesUnordered<_> = Default::default();

Expand Down
7 changes: 6 additions & 1 deletion foundations/src/telemetry/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ pub(super) fn init(
}

let settings = Arc::new(settings);

// Eagerly init the memory profiler so it gets set up before syscalls are sandboxed with seccomp.
#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
memory_profiling::profiler(Arc::clone(&settings)).map_err(|err| anyhow!(err))?;

let router = create_router(&settings, custom_routes)?;
let addr = settings.server.addr;

Expand Down Expand Up @@ -162,7 +167,7 @@ mod memory_profiling {
use super::*;
use crate::telemetry::MemoryProfiler;

fn profiler(settings: Arc<TelemetrySettings>) -> Result<MemoryProfiler> {
pub(super) fn profiler(settings: Arc<TelemetrySettings>) -> Result<MemoryProfiler> {
MemoryProfiler::get_or_init_with(&settings.memory_profiler)?.ok_or_else(|| {
"profiling should be enabled via `_RJEM_MALLOC_CONF=prof:true` env var".into()
})
Expand Down
17 changes: 0 additions & 17 deletions foundations/src/telemetry/settings/memory_profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ pub struct MemoryProfilerSettings {
/// The default is `19` (2 ^ 19 = 512KiB).
#[serde(default = "MemoryProfilerSettings::default_sample_interval")]
pub sample_interval: u8,

/// Enables [seccomp] sandboxing of syscalls made by [jemalloc] during heap profile collection.
///
/// [seccomp]: https://en.wikipedia.org/wiki/Seccomp
/// [jemalloc]: https://github.com/jemalloc/jemalloc
#[cfg(feature = "security")]
#[serde(default = "MemoryProfilerSettings::default_sandbox_profiling_syscalls")]
pub sandbox_profiling_syscalls: bool,
}

#[cfg(not(feature = "settings"))]
Expand All @@ -33,10 +25,6 @@ impl Default for MemoryProfilerSettings {
Self {
enabled: false,
sample_interval: MemoryProfilerSettings::default_sample_interval(),

#[cfg(feature = "security")]
sandbox_profiling_syscalls: MemoryProfilerSettings::default_sandbox_profiling_syscalls(
),
}
}
}
Expand All @@ -45,9 +33,4 @@ impl MemoryProfilerSettings {
fn default_sample_interval() -> u8 {
19
}

#[cfg(feature = "security")]
fn default_sandbox_profiling_syscalls() -> bool {
true
}
}