Skip to content

[PROF-8667] Heap Profiling - Part 2 - Allocations #3282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion benchmarks/profiler_sample_loop_v2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ class ProfilerSampleLoopBenchmark
PROFILER_OVERHEAD_STACK_THREAD = Thread.new { sleep }

def create_profiler
@recorder = Datadog::Profiling::StackRecorder.new(cpu_time_enabled: true, alloc_samples_enabled: true)
@recorder = Datadog::Profiling::StackRecorder.new(
cpu_time_enabled: true,
alloc_samples_enabled: false,
heap_samples_enabled: false
)
@collector = Datadog::Profiling::Collectors::ThreadContext.new(
recorder: @recorder, max_frames: 400, tracer: nil, endpoint_collection_enabled: false, timeline_enabled: false
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,19 @@ struct cpu_and_wall_time_worker_state {
// These are immutable after initialization

bool gc_profiling_enabled;
bool allocation_counting_enabled;
bool no_signals_workaround_enabled;
bool dynamic_sampling_rate_enabled;
int allocation_sample_every; // Temporarily used for development/testing of allocation profiling
int allocation_sample_every;
bool allocation_profiling_enabled;
bool heap_profiling_enabled;
VALUE self_instance;
VALUE thread_context_collector_instance;
VALUE idle_sampling_helper_instance;
VALUE owner_thread;
dynamic_sampling_rate_state dynamic_sampling_rate;
VALUE gc_tracepoint; // Used to get gc start/finish information
VALUE object_allocation_tracepoint; // Used to get allocation counts and allocation profiling
VALUE object_free_tracepoint; // Used to figure out live objects for heap profiling

// These are mutable and used to signal things between the worker thread and other threads

Expand Down Expand Up @@ -149,10 +151,11 @@ static VALUE _native_initialize(
VALUE thread_context_collector_instance,
VALUE gc_profiling_enabled,
VALUE idle_sampling_helper_instance,
VALUE allocation_counting_enabled,
VALUE no_signals_workaround_enabled,
VALUE dynamic_sampling_rate_enabled,
VALUE allocation_sample_every
VALUE allocation_sample_every,
VALUE allocation_profiling_enabled,
VALUE heap_profiling_enabled
);
static void cpu_and_wall_time_worker_typed_data_mark(void *state_ptr);
static VALUE _native_sampling_loop(VALUE self, VALUE instance);
Expand Down Expand Up @@ -186,6 +189,7 @@ static void reset_stats(struct cpu_and_wall_time_worker_state *state);
static void sleep_for(uint64_t time_ns);
static VALUE _native_allocation_count(DDTRACE_UNUSED VALUE self);
static void on_newobj_event(VALUE tracepoint_data, DDTRACE_UNUSED void *unused);
static void on_freeobj_event(VALUE tracepoint_data, DDTRACE_UNUSED void *unused);
static void disable_tracepoints(struct cpu_and_wall_time_worker_state *state);
static VALUE _native_with_blocked_sigprof(DDTRACE_UNUSED VALUE self);
static VALUE rescued_sample_allocation(VALUE tracepoint_data);
Expand Down Expand Up @@ -226,7 +230,7 @@ void collectors_cpu_and_wall_time_worker_init(VALUE profiling_module) {
// https://bugs.ruby-lang.org/issues/18007 for a discussion around this.
rb_define_alloc_func(collectors_cpu_and_wall_time_worker_class, _native_new);

rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_initialize", _native_initialize, 8);
rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_initialize", _native_initialize, 9);
rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_sampling_loop", _native_sampling_loop, 1);
rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_stop", _native_stop, 2);
rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_reset_after_fork", _native_reset_after_fork, 1);
Expand Down Expand Up @@ -264,16 +268,18 @@ static VALUE _native_new(VALUE klass) {
// being leaked.

state->gc_profiling_enabled = false;
state->allocation_counting_enabled = false;
state->no_signals_workaround_enabled = false;
state->dynamic_sampling_rate_enabled = true;
state->allocation_sample_every = 0;
state->allocation_profiling_enabled = false;
state->heap_profiling_enabled = false;
state->thread_context_collector_instance = Qnil;
state->idle_sampling_helper_instance = Qnil;
state->owner_thread = Qnil;
dynamic_sampling_rate_init(&state->dynamic_sampling_rate);
state->gc_tracepoint = Qnil;
state->object_allocation_tracepoint = Qnil;
state->object_free_tracepoint = Qnil;

atomic_init(&state->should_run, false);
state->failure_exception = Qnil;
Expand All @@ -292,34 +298,42 @@ static VALUE _native_initialize(
VALUE thread_context_collector_instance,
VALUE gc_profiling_enabled,
VALUE idle_sampling_helper_instance,
VALUE allocation_counting_enabled,
VALUE no_signals_workaround_enabled,
VALUE dynamic_sampling_rate_enabled,
VALUE allocation_sample_every
VALUE allocation_sample_every,
VALUE allocation_profiling_enabled,
VALUE heap_profiling_enabled
) {
ENFORCE_BOOLEAN(gc_profiling_enabled);
ENFORCE_BOOLEAN(allocation_counting_enabled);
ENFORCE_BOOLEAN(no_signals_workaround_enabled);
ENFORCE_BOOLEAN(dynamic_sampling_rate_enabled);
ENFORCE_TYPE(allocation_sample_every, T_FIXNUM);
ENFORCE_BOOLEAN(allocation_profiling_enabled);
ENFORCE_BOOLEAN(heap_profiling_enabled);

struct cpu_and_wall_time_worker_state *state;
TypedData_Get_Struct(self_instance, struct cpu_and_wall_time_worker_state, &cpu_and_wall_time_worker_typed_data, state);

state->gc_profiling_enabled = (gc_profiling_enabled == Qtrue);
state->allocation_counting_enabled = (allocation_counting_enabled == Qtrue);
state->no_signals_workaround_enabled = (no_signals_workaround_enabled == Qtrue);
state->dynamic_sampling_rate_enabled = (dynamic_sampling_rate_enabled == Qtrue);
state->allocation_sample_every = NUM2INT(allocation_sample_every);
state->allocation_profiling_enabled = (allocation_profiling_enabled == Qtrue);
state->heap_profiling_enabled = (heap_profiling_enabled == Qtrue);

if (state->allocation_sample_every < 0) {
rb_raise(rb_eArgError, "Unexpected value for allocation_sample_every: %d. This value must be >= 0.", state->allocation_sample_every);
if (state->allocation_sample_every <= 0) {
rb_raise(rb_eArgError, "Unexpected value for allocation_sample_every: %d. This value must be > 0.", state->allocation_sample_every);
}

if (state->heap_profiling_enabled && !state->allocation_profiling_enabled) {
rb_raise(rb_eArgError, "Heap profiling requires allocation profiling to be enabled but it isn't.");
}

state->thread_context_collector_instance = enforce_thread_context_collector_instance(thread_context_collector_instance);
state->idle_sampling_helper_instance = idle_sampling_helper_instance;
state->gc_tracepoint = rb_tracepoint_new(Qnil, RUBY_INTERNAL_EVENT_GC_ENTER | RUBY_INTERNAL_EVENT_GC_EXIT, on_gc_event, NULL /* unused */);
state->object_allocation_tracepoint = rb_tracepoint_new(Qnil, RUBY_INTERNAL_EVENT_NEWOBJ, on_newobj_event, NULL /* unused */);
state->object_free_tracepoint = rb_tracepoint_new(Qnil, RUBY_INTERNAL_EVENT_FREEOBJ, on_freeobj_event, NULL /* unused */);

return Qtrue;
}
Expand All @@ -335,6 +349,7 @@ static void cpu_and_wall_time_worker_typed_data_mark(void *state_ptr) {
rb_gc_mark(state->stop_thread);
rb_gc_mark(state->gc_tracepoint);
rb_gc_mark(state->object_allocation_tracepoint);
rb_gc_mark(state->object_free_tracepoint);
}

// Called in a background thread created in CpuAndWallTimeWorker#start
Expand Down Expand Up @@ -632,7 +647,8 @@ static VALUE release_gvl_and_run_sampling_trigger_loop(VALUE instance) {
// because they may raise exceptions.
install_sigprof_signal_handler(handle_sampling_signal, "handle_sampling_signal");
if (state->gc_profiling_enabled) rb_tracepoint_enable(state->gc_tracepoint);
if (state->allocation_counting_enabled) rb_tracepoint_enable(state->object_allocation_tracepoint);
if (state->allocation_profiling_enabled) rb_tracepoint_enable(state->object_allocation_tracepoint);
if (state->heap_profiling_enabled) rb_tracepoint_enable(state->object_free_tracepoint);

rb_thread_call_without_gvl(run_sampling_trigger_loop, state, interrupt_sampling_trigger_loop, state);

Expand Down Expand Up @@ -888,9 +904,9 @@ static void sleep_for(uint64_t time_ns) {
}

static VALUE _native_allocation_count(DDTRACE_UNUSED VALUE self) {
bool is_profiler_running = active_sampler_instance_state != NULL;
bool are_allocations_being_tracked = active_sampler_instance_state != NULL && active_sampler_instance_state->allocation_profiling_enabled;

return is_profiler_running ? ULL2NUM(allocation_count) : Qnil;
return are_allocations_being_tracked ? ULL2NUM(allocation_count) : Qnil;
}

// Implements memory-related profiling events. This function is called by Ruby via the `object_allocation_tracepoint`
Expand All @@ -916,25 +932,42 @@ static void on_newobj_event(VALUE tracepoint_data, DDTRACE_UNUSED void *unused)
return;
}

// @ivoanjo: Strictly speaking, this is not needed because Ruby should not call the same tracepoint while a previous
// invocation is still pending, (e.g. it wouldn't call `on_newobj_event` while it's already running), but I decided
// to keep this here for consistency -- every call to the thread context (other than the special gc calls which are
// defined as not being able to allocate) sets this.
state->during_sample = true;

// TODO: This is a placeholder sampling decision strategy. We plan to replace it with a better one soon (e.g. before
// beta), and having something here allows us to test the rest of feature, sampling decision aside.
if (state->allocation_sample_every > 0 && ((allocation_count % state->allocation_sample_every) == 0)) {
if (((allocation_count % state->allocation_sample_every) == 0)) {
// Rescue against any exceptions that happen during sampling
safely_call(rescued_sample_allocation, tracepoint_data, state->self_instance);
}

state->during_sample = false;
}

// Safety: This function may get called while Ruby is doing garbage collection. While Ruby is doing garbage collection,
// *NO ALLOCATION* is allowed. This function, and any it calls must never trigger memory or object allocation.
// This includes exceptions and use of ruby_xcalloc (because xcalloc can trigger GC)!
static void on_freeobj_event(VALUE tracepoint_data, DDTRACE_UNUSED void *unused) {
struct cpu_and_wall_time_worker_state *state = active_sampler_instance_state; // Read from global variable, see "sampler global state safety" note above

// This should not happen in a normal situation because the tracepoint is always enabled after the instance is set
// and disabled before it is cleared, but just in case...
if (state == NULL) return;

// NOTE: Because this is likely to be happening during GC, handling of this tracepoint does not do any allocation.
// We also do not want to lose any frees as that would affect the accuracy of our live heap tracking so we skip
// the typical `state->during_sample` dropping that other sampling tracepoints have.

rb_trace_arg_t *data = rb_tracearg_from_tracepoint(tracepoint_data);
VALUE freed_object = rb_tracearg_object(data);

thread_context_collector_sample_free(state->thread_context_collector_instance, freed_object);
}

static void disable_tracepoints(struct cpu_and_wall_time_worker_state *state) {
rb_tracepoint_disable(state->gc_tracepoint);
rb_tracepoint_disable(state->object_allocation_tracepoint);
rb_tracepoint_disable(state->object_free_tracepoint);
}

static VALUE _native_with_blocked_sigprof(DDTRACE_UNUSED VALUE self) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,8 @@ void thread_context_collector_sample_allocation(VALUE self_instance, unsigned in
}
}

track_object(state->recorder_instance, new_object, sample_weight);

trigger_sample_for_thread(
state,
/* thread: */ current_thread,
Expand All @@ -1216,6 +1218,16 @@ void thread_context_collector_sample_allocation(VALUE self_instance, unsigned in
);
}

// Safety: This function may get called while Ruby is doing garbage collection. While Ruby is doing garbage collection,
// *NO ALLOCATION* is allowed. This function, and any it calls must never trigger memory or object allocation.
// This includes exceptions and use of ruby_xcalloc (because xcalloc can trigger GC)!
void thread_context_collector_sample_free(VALUE self_instance, VALUE freed_object) {
struct thread_context_collector_state *state;
TypedData_Get_Struct(self_instance, struct thread_context_collector_state, &thread_context_collector_typed_data, state);

record_obj_free(state->recorder_instance, freed_object);
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample_allocation(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE sample_weight, VALUE new_object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ void thread_context_collector_sample(
VALUE profiler_overhead_stack_thread
);
void thread_context_collector_sample_allocation(VALUE self_instance, unsigned int sample_weight, VALUE new_object);
void thread_context_collector_sample_free(VALUE self_instance, VALUE freed_object);
VALUE thread_context_collector_sample_after_gc(VALUE self_instance);
void thread_context_collector_on_gc_start(VALUE self_instance);
void thread_context_collector_on_gc_finish(VALUE self_instance);
Expand Down
5 changes: 5 additions & 0 deletions ext/ddtrace_profiling_native_extension/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ def add_compiler_flag(flag)
add_compiler_flag '-Wall'
add_compiler_flag '-Wextra'

if ENV['DDTRACE_DEBUG']
CONFIG['optflags'] = '-O0'
CONFIG['debugflags'] = '-ggdb3'
end

if RUBY_PLATFORM.include?('linux')
# Supposedly, the correct way to do this is
# ```
Expand Down
Loading