@@ -66,10 +66,12 @@ use crate::execution::memory_pools::{
66
66
use crate :: execution:: operators:: ScanExec ;
67
67
use crate :: execution:: shuffle:: { read_ipc_compressed, CompressionCodec } ;
68
68
use crate :: execution:: spark_plan:: SparkPlan ;
69
+
70
+ use crate :: execution:: tracing:: TraceGuard ;
71
+ use crate :: execution:: tracing:: { log_counter, trace_begin, trace_end} ;
72
+
69
73
use log:: info;
70
74
use once_cell:: sync:: Lazy ;
71
- #[ cfg( target_os = "linux" ) ]
72
- use procfs:: process:: Process ;
73
75
#[ cfg( feature = "jemalloc" ) ]
74
76
use tikv_jemalloc_ctl:: { epoch, stats} ;
75
77
@@ -131,7 +133,7 @@ struct ExecutionContext {
131
133
/// Memory pool config
132
134
pub memory_pool_config : MemoryPoolConfig ,
133
135
/// Whether to log memory usage on each call to execute_plan
134
- pub memory_profiling_enabled : bool ,
136
+ pub tracing_enabled : bool ,
135
137
}
136
138
137
139
/// Accept serialized query plan and return the address of the native query plan.
@@ -157,9 +159,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
157
159
task_attempt_id : jlong ,
158
160
debug_native : jboolean ,
159
161
explain_native : jboolean ,
160
- memory_profiling_enabled : jboolean ,
162
+ tracing_enabled : jboolean ,
161
163
) -> jlong {
162
164
try_unwrap_or_throw ( & e, |mut env| {
165
+ let _ = TraceGuard :: new ( "createPlan" , tracing_enabled != JNI_FALSE ) ;
166
+
163
167
// Init JVM classes
164
168
JVMClasses :: init ( & mut env) ;
165
169
@@ -238,7 +242,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
238
242
debug_native : debug_native == 1 ,
239
243
explain_native : explain_native == 1 ,
240
244
memory_pool_config,
241
- memory_profiling_enabled : memory_profiling_enabled != JNI_FALSE ,
245
+ tracing_enabled : tracing_enabled != JNI_FALSE ,
242
246
} ) ;
243
247
244
248
Ok ( Box :: into_raw ( exec_context) as i64 )
@@ -362,43 +366,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
362
366
exec_context : jlong ,
363
367
array_addrs : jlongArray ,
364
368
schema_addrs : jlongArray ,
369
+ tracing_enabled : jboolean ,
365
370
) -> jlong {
366
371
try_unwrap_or_throw ( & e, |mut env| {
372
+ let _ = TraceGuard :: new ( "executePlan" , tracing_enabled != JNI_FALSE ) ;
373
+
367
374
// Retrieve the query
368
375
let exec_context = get_execution_context ( exec_context) ;
369
376
370
- // memory profiling is only available on linux
371
- if exec_context. memory_profiling_enabled {
372
- #[ cfg( target_os = "linux" ) ]
377
+ if exec_context. tracing_enabled {
378
+ #[ cfg( feature = "jemalloc" ) ]
373
379
{
374
- let pid = std:: process:: id ( ) ;
375
- let process = Process :: new ( pid as i32 ) . unwrap ( ) ;
376
- let statm = process. statm ( ) . unwrap ( ) ;
377
- let page_size = procfs:: page_size ( ) ;
378
- println ! (
379
- "NATIVE_MEMORY: {{ resident: {:.0} }}" ,
380
- ( statm. resident * page_size) as f64 / ( 1024.0 * 1024.0 )
381
- ) ;
382
-
383
- #[ cfg( feature = "jemalloc" ) ]
384
- {
385
- // Obtain a MIB for the `epoch`, `stats.allocated`, and
386
- // `atats.resident` keys:
387
- let e = epoch:: mib ( ) . unwrap ( ) ;
388
- let allocated = stats:: allocated:: mib ( ) . unwrap ( ) ;
389
- let resident = stats:: resident:: mib ( ) . unwrap ( ) ;
390
-
391
- // Many statistics are cached and only updated
392
- // when the epoch is advanced:
393
- e. advance ( ) . unwrap ( ) ;
394
-
395
- // Read statistics using MIB key:
396
- let allocated = allocated. read ( ) . unwrap ( ) as f64 / ( 1024.0 * 1024.0 ) ;
397
- let resident = resident. read ( ) . unwrap ( ) as f64 / ( 1024.0 * 1024.0 ) ;
398
- println ! (
399
- "NATIVE_MEMORY_JEMALLOC: {{ allocated: {allocated:.0}, resident: {resident:.0} }}"
400
- ) ;
401
- }
380
+ let e = epoch:: mib ( ) . unwrap ( ) ;
381
+ let allocated = stats:: allocated:: mib ( ) . unwrap ( ) ;
382
+ e. advance ( ) . unwrap ( ) ;
383
+ use crate :: execution:: tracing:: log_counter;
384
+ log_counter ( "jemalloc_allocated" , allocated. read ( ) . unwrap ( ) as u64 ) ;
402
385
}
403
386
}
404
387
@@ -481,7 +464,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
481
464
) ;
482
465
}
483
466
}
484
-
485
467
return Ok ( -1 ) ;
486
468
}
487
469
// A poll pending means there are more than one blocking operators,
@@ -580,8 +562,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
580
562
current_checksum : jlong ,
581
563
compression_codec : jstring ,
582
564
compression_level : jint ,
565
+ tracing_enabled : jboolean ,
583
566
) -> jlongArray {
584
567
try_unwrap_or_throw ( & e, |mut env| unsafe {
568
+ let _ = TraceGuard :: new ( "writeSortedFileNative" , tracing_enabled != JNI_FALSE ) ;
569
+
585
570
let data_types = convert_datatype_arrays ( & mut env, serialized_datatypes) ?;
586
571
587
572
let row_address_array = JLongArray :: from_raw ( row_addresses) ;
@@ -655,12 +640,13 @@ pub extern "system" fn Java_org_apache_comet_Native_sortRowPartitionsNative(
655
640
_class : JClass ,
656
641
address : jlong ,
657
642
size : jlong ,
643
+ tracing_enabled : jboolean ,
658
644
) {
659
645
try_unwrap_or_throw ( & e, |_| {
646
+ let _ = TraceGuard :: new ( "sortRowPartitionsNative" , tracing_enabled != JNI_FALSE ) ;
660
647
// SAFETY: JVM unsafe memory allocation is aligned with long.
661
648
let array = unsafe { std:: slice:: from_raw_parts_mut ( address as * mut i64 , size as usize ) } ;
662
649
array. rdxsort ( ) ;
663
-
664
650
Ok ( ( ) )
665
651
} )
666
652
}
@@ -676,12 +662,60 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_decodeShuffleBlock(
676
662
length : jint ,
677
663
array_addrs : jlongArray ,
678
664
schema_addrs : jlongArray ,
665
+ tracing_enabled : jboolean ,
679
666
) -> jlong {
680
667
try_unwrap_or_throw ( & e, |mut env| {
668
+ let _ = TraceGuard :: new ( "decodeShuffleBlock" , tracing_enabled != JNI_FALSE ) ;
681
669
let raw_pointer = env. get_direct_buffer_address ( & byte_buffer) ?;
682
670
let length = length as usize ;
683
671
let slice: & [ u8 ] = unsafe { std:: slice:: from_raw_parts ( raw_pointer, length) } ;
684
672
let batch = read_ipc_compressed ( slice) ?;
685
673
prepare_output ( & mut env, array_addrs, schema_addrs, batch, false )
686
674
} )
687
675
}
676
+
677
+ #[ no_mangle]
678
+ /// # Safety
679
+ /// This function is inherently unsafe since it deals with raw pointers passed from JNI.
680
+ pub unsafe extern "system" fn Java_org_apache_comet_Native_traceBegin (
681
+ e : JNIEnv ,
682
+ _class : JClass ,
683
+ event : jstring ,
684
+ ) {
685
+ try_unwrap_or_throw ( & e, |mut env| {
686
+ let name: String = env. get_string ( & JString :: from_raw ( event) ) . unwrap ( ) . into ( ) ;
687
+ trace_begin ( & name) ;
688
+ Ok ( ( ) )
689
+ } )
690
+ }
691
+
692
+ #[ no_mangle]
693
+ /// # Safety
694
+ /// This function is inherently unsafe since it deals with raw pointers passed from JNI.
695
+ pub unsafe extern "system" fn Java_org_apache_comet_Native_traceEnd (
696
+ e : JNIEnv ,
697
+ _class : JClass ,
698
+ event : jstring ,
699
+ ) {
700
+ try_unwrap_or_throw ( & e, |mut env| {
701
+ let name: String = env. get_string ( & JString :: from_raw ( event) ) . unwrap ( ) . into ( ) ;
702
+ trace_end ( & name) ;
703
+ Ok ( ( ) )
704
+ } )
705
+ }
706
+
707
+ #[ no_mangle]
708
+ /// # Safety
709
+ /// This function is inherently unsafe since it deals with raw pointers passed from JNI.
710
+ pub unsafe extern "system" fn Java_org_apache_comet_Native_logCounter (
711
+ e : JNIEnv ,
712
+ _class : JClass ,
713
+ name : jstring ,
714
+ value : jlong ,
715
+ ) {
716
+ try_unwrap_or_throw ( & e, |mut env| {
717
+ let name: String = env. get_string ( & JString :: from_raw ( name) ) . unwrap ( ) . into ( ) ;
718
+ log_counter ( & name, value as u64 ) ;
719
+ Ok ( ( ) )
720
+ } )
721
+ }
0 commit comments