Skip to content

Commit 15b1f5f

Browse files
authored
Trace memory alloc/dealloc by thread (#366)
1 parent 5042543 commit 15b1f5f

File tree

19 files changed

+445
-8
lines changed

19 files changed

+445
-8
lines changed

proxy_components/engine_store_ffi/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ openssl-vendored = [
3232
"openssl/vendored"
3333
]
3434

35+
jemalloc = ["proxy_ffi/jemalloc"]
36+
external-jemalloc = ["proxy_ffi/external-jemalloc"]
37+
3538
[dependencies]
3639
batch-system = { workspace = true, default-features = false }
3740
bitflags = "1.0.1"

proxy_components/engine_store_ffi/src/core/forward_raft/region.rs

+8
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
1313
}
1414

1515
pub fn on_region_changed(&self, ob_region: &Region, e: RegionChangeEvent, r: StateRole) {
16+
self.engine_store_server_helper
17+
.maybe_jemalloc_register_alloc();
18+
self.engine_store_server_helper
19+
.directly_report_jemalloc_alloc();
1620
let region_id = ob_region.get_id();
1721
if e == RegionChangeEvent::Destroy {
1822
info!(
@@ -104,6 +108,10 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
104108
}
105109

106110
pub fn on_role_change(&self, ob_region: &Region, r: &RoleChange) {
111+
self.engine_store_server_helper
112+
.maybe_jemalloc_register_alloc();
113+
self.engine_store_server_helper
114+
.directly_report_jemalloc_alloc();
107115
let region_id = ob_region.get_id();
108116
let is_replicated = !r.initialized;
109117
let is_fap_enabled = if let Some(b) = self.engine.proxy_ext.config_set.as_ref() {

proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs

-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ pub fn maybe_collect_states(
9898
}
9999

100100
pub fn collect_all_states(cluster_ext: &ClusterExt, region_id: u64) -> HashMap<u64, States> {
101-
maybe_collect_states(cluster_ext, region_id, None);
102101
let prev_state = maybe_collect_states(cluster_ext, region_id, None);
103102
assert_eq!(
104103
prev_state.len(),

proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs

+72-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::{cell::RefCell, pin::Pin, sync::atomic::Ordering};
3+
use std::{
4+
cell::RefCell,
5+
pin::Pin,
6+
sync::{atomic::Ordering, Mutex},
7+
};
48

59
use engine_store_ffi::TiFlashEngine;
610

@@ -18,6 +22,23 @@ use super::{
1822
};
1923
use crate::mock_cluster;
2024

25+
#[derive(Clone)]
26+
pub struct ThreadInfoJealloc {
27+
pub allocated_ptr: u64,
28+
pub deallocated_ptr: u64,
29+
}
30+
31+
impl ThreadInfoJealloc {
32+
pub fn allocated(&self) -> u64 {
33+
unsafe { *(self.allocated_ptr as *const u64) }
34+
}
35+
pub fn deallocated(&self) -> u64 {
36+
unsafe { *(self.deallocated_ptr as *const u64) }
37+
}
38+
pub fn remaining(&self) -> i64 {
39+
self.allocated() as i64 - self.deallocated() as i64
40+
}
41+
}
2142
pub struct EngineStoreServer {
2243
pub id: u64,
2344
// TODO engines maybe changed into TabletRegistry?
@@ -28,6 +49,7 @@ pub struct EngineStoreServer {
2849
pub page_storage: MockPageStorage,
2950
// (region_id, peer_id) -> MockRegion
3051
pub tmp_fap_regions: HashMap<RegionId, Box<MockRegion>>,
52+
pub thread_info_map: Mutex<HashMap<String, ThreadInfoJealloc>>,
3153
}
3254

3355
impl EngineStoreServer {
@@ -40,6 +62,7 @@ impl EngineStoreServer {
4062
region_states: RefCell::new(Default::default()),
4163
page_storage: Default::default(),
4264
tmp_fap_regions: Default::default(),
65+
thread_info_map: Default::default(),
4366
}
4467
}
4568

@@ -369,6 +392,8 @@ pub fn gen_engine_store_server_helper(
369392
fn_query_fap_snapshot_state: Some(ffi_query_fap_snapshot_state),
370393
fn_kvstore_region_exists: Some(ffi_kvstore_region_exists),
371394
fn_clear_fap_snapshot: Some(ffi_clear_fap_snapshot),
395+
fn_report_thread_allocate_info: Some(ffi_report_thread_allocate_info),
396+
fn_report_thread_allocate_batch: Some(ffi_report_thread_allocate_batch),
372397
ps: PageStorageInterfaces {
373398
fn_create_write_batch: Some(ffi_mockps_create_write_batch),
374399
fn_wb_put_page: Some(ffi_mockps_wb_put_page),
@@ -609,3 +634,49 @@ unsafe extern "C" fn ffi_get_lock_by_key(
609634
},
610635
}
611636
}
637+
638+
unsafe extern "C" fn ffi_report_thread_allocate_info(
639+
arg1: *mut interfaces_ffi::EngineStoreServerWrap,
640+
_: u64,
641+
name: interfaces_ffi::BaseBuffView,
642+
t: interfaces_ffi::ReportThreadAllocateInfoType,
643+
value: u64,
644+
) {
645+
let store = into_engine_store_server_wrap(arg1);
646+
let tn = std::str::from_utf8(name.to_slice()).unwrap().to_string();
647+
match (*store.engine_store_server)
648+
.thread_info_map
649+
.lock()
650+
.expect("poisoned")
651+
.entry(tn)
652+
{
653+
std::collections::hash_map::Entry::Occupied(mut o) => {
654+
if t == interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr {
655+
o.get_mut().allocated_ptr = value;
656+
} else if t == interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr {
657+
o.get_mut().deallocated_ptr = value;
658+
}
659+
}
660+
std::collections::hash_map::Entry::Vacant(v) => {
661+
if t == interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr {
662+
v.insert(ThreadInfoJealloc {
663+
allocated_ptr: value,
664+
deallocated_ptr: 0,
665+
});
666+
} else if t == interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr {
667+
v.insert(ThreadInfoJealloc {
668+
allocated_ptr: 0,
669+
deallocated_ptr: value,
670+
});
671+
}
672+
}
673+
}
674+
}
675+
676+
unsafe extern "C" fn ffi_report_thread_allocate_batch(
677+
_: *mut interfaces_ffi::EngineStoreServerWrap,
678+
_: u64,
679+
_name: interfaces_ffi::BaseBuffView,
680+
_data: interfaces_ffi::ReportThreadAllocateInfoBatch,
681+
) {
682+
}

proxy_components/proxy_ffi/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@ test-engines-rocksdb = [
2222
test-engines-panic = [
2323
"engine_test/test-engines-panic",
2424
]
25+
jemalloc = []
2526

2627
# TODO use encryption/openssl-vendored if later supports
2728
openssl-vendored = [
2829
"openssl/vendored"
2930
]
3031

32+
external-jemalloc = []
33+
3134
[dependencies]
3235
encryption = { workspace = true, default-features = false }
3336
openssl = { workspace = true } # TODO only for feature

proxy_components/proxy_ffi/src/engine_store_helper_impls.rs

+108-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.
2-
use std::pin::Pin;
2+
use std::{cell::RefCell, pin::Pin};
33

44
use kvproto::{kvrpcpb, metapb, raft_cmdpb};
55

@@ -34,6 +34,13 @@ pub fn gen_engine_store_server_helper(
3434
unsafe { &(*(engine_store_server_helper as *const EngineStoreServerHelper)) }
3535
}
3636

37+
thread_local! {
38+
pub static JEMALLOC_REGISTERED: RefCell<bool> = RefCell::new(false);
39+
pub static JEMALLOC_TNAME: RefCell<(String, u64)> = RefCell::new(Default::default());
40+
pub static JEMALLOC_ALLOCP: RefCell<*mut u64> = RefCell::new(std::ptr::null_mut());
41+
pub static JEMALLOC_DEALLOCP: RefCell<*mut u64> = RefCell::new(std::ptr::null_mut());
42+
}
43+
3744
/// # Safety
3845
/// The lifetime of `engine_store_server_helper` is definitely longer than
3946
/// `ENGINE_STORE_SERVER_HELPER_PTR`.
@@ -49,6 +56,85 @@ pub fn set_server_info_resp(res: &kvproto::diagnosticspb::ServerInfoResponse, pt
4956
}
5057

5158
impl EngineStoreServerHelper {
59+
pub fn maybe_jemalloc_register_alloc(&self) {
60+
JEMALLOC_REGISTERED.with(|b| {
61+
if !*b.borrow() {
62+
unsafe {
63+
let ptr_alloc: u64 = crate::jemalloc_utils::get_allocatep_on_thread_start();
64+
let ptr_dealloc: u64 = crate::jemalloc_utils::get_deallocatep_on_thread_start();
65+
let thread_name = std::thread::current().name().unwrap_or("").to_string();
66+
let thread_id: u64 = std::thread::current().id().as_u64().into();
67+
(self.fn_report_thread_allocate_info.into_inner())(
68+
self.inner,
69+
thread_id,
70+
BaseBuffView::from(thread_name.as_bytes()),
71+
interfaces_ffi::ReportThreadAllocateInfoType::Reset,
72+
0,
73+
);
74+
(self.fn_report_thread_allocate_info.into_inner())(
75+
self.inner,
76+
thread_id,
77+
BaseBuffView::from(thread_name.as_bytes()),
78+
interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr,
79+
ptr_alloc,
80+
);
81+
(self.fn_report_thread_allocate_info.into_inner())(
82+
self.inner,
83+
thread_id,
84+
BaseBuffView::from(thread_name.as_bytes()),
85+
interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr,
86+
ptr_dealloc,
87+
);
88+
89+
// Some threads are not everlasting, so we don't want TiFlash to directly access
90+
// the pointer.
91+
JEMALLOC_TNAME.with(|p| {
92+
*p.borrow_mut() = (thread_name, thread_id);
93+
});
94+
if ptr_alloc != 0 {
95+
JEMALLOC_ALLOCP.with(|p| {
96+
*p.borrow_mut() = ptr_alloc as *mut u64;
97+
});
98+
}
99+
if ptr_dealloc != 0 {
100+
JEMALLOC_DEALLOCP.with(|p| {
101+
*p.borrow_mut() = ptr_dealloc as *mut u64;
102+
});
103+
}
104+
}
105+
*(b.borrow_mut()) = true;
106+
}
107+
});
108+
}
109+
110+
pub fn directly_report_jemalloc_alloc(&self) {
111+
JEMALLOC_TNAME.with(|thread_info| unsafe {
112+
let a = JEMALLOC_ALLOCP.with(|p| {
113+
let p = *p.borrow_mut();
114+
if p.is_null() {
115+
return 0;
116+
}
117+
*p
118+
});
119+
let d = JEMALLOC_DEALLOCP.with(|p| {
120+
let p = *p.borrow_mut();
121+
if p.is_null() {
122+
return 0;
123+
}
124+
*p
125+
});
126+
(self.fn_report_thread_allocate_batch.into_inner())(
127+
self.inner,
128+
thread_info.borrow().1,
129+
BaseBuffView::from(thread_info.borrow().0.as_bytes()),
130+
interfaces_ffi::ReportThreadAllocateInfoBatch {
131+
alloc: a,
132+
dealloc: d,
133+
},
134+
);
135+
});
136+
}
137+
52138
pub fn gc_raw_cpp_ptr(&self, ptr: *mut ::std::os::raw::c_void, tp: RawCppPtrType) {
53139
debug_assert!(self.fn_gc_raw_cpp_ptr.is_some());
54140
unsafe {
@@ -82,6 +168,7 @@ impl EngineStoreServerHelper {
82168

83169
pub fn handle_compute_store_stats(&self) -> StoreStats {
84170
debug_assert!(self.fn_handle_compute_store_stats.is_some());
171+
self.maybe_jemalloc_register_alloc();
85172
unsafe { (self.fn_handle_compute_store_stats.into_inner())(self.inner) }
86173
}
87174

@@ -91,16 +178,22 @@ impl EngineStoreServerHelper {
91178
header: RaftCmdHeader,
92179
) -> EngineStoreApplyRes {
93180
debug_assert!(self.fn_handle_write_raft_cmd.is_some());
181+
self.maybe_jemalloc_register_alloc();
182+
self.directly_report_jemalloc_alloc();
94183
unsafe { (self.fn_handle_write_raft_cmd.into_inner())(self.inner, cmds.gen_view(), header) }
95184
}
96185

97186
pub fn handle_get_engine_store_server_status(&self) -> EngineStoreServerStatus {
98187
debug_assert!(self.fn_handle_get_engine_store_server_status.is_some());
188+
self.maybe_jemalloc_register_alloc();
189+
self.directly_report_jemalloc_alloc();
99190
unsafe { (self.fn_handle_get_engine_store_server_status.into_inner())(self.inner) }
100191
}
101192

102193
pub fn handle_set_proxy(&self, proxy: *const RaftStoreProxyFFIHelper) {
103194
debug_assert!(self.fn_atomic_update_proxy.is_some());
195+
self.maybe_jemalloc_register_alloc();
196+
self.directly_report_jemalloc_alloc();
104197
unsafe { (self.fn_atomic_update_proxy.into_inner())(self.inner, proxy as *mut _) }
105198
}
106199

@@ -129,7 +222,8 @@ impl EngineStoreServerHelper {
129222
header: RaftCmdHeader,
130223
) -> EngineStoreApplyRes {
131224
debug_assert!(self.fn_handle_admin_raft_cmd.is_some());
132-
225+
self.maybe_jemalloc_register_alloc();
226+
self.directly_report_jemalloc_alloc();
133227
unsafe {
134228
let req = ProtoMsgBaseBuff::new(req);
135229
let resp = ProtoMsgBaseBuff::new(resp);
@@ -158,6 +252,8 @@ impl EngineStoreServerHelper {
158252
term: u64,
159253
) -> bool {
160254
debug_assert!(self.fn_try_flush_data.is_some());
255+
self.maybe_jemalloc_register_alloc();
256+
self.directly_report_jemalloc_alloc();
161257
// TODO(proactive flush)
162258
unsafe {
163259
(self.fn_try_flush_data.into_inner())(
@@ -187,6 +283,8 @@ impl EngineStoreServerHelper {
187283
) -> RawCppPtr {
188284
debug_assert!(self.fn_pre_handle_snapshot.is_some());
189285

286+
self.maybe_jemalloc_register_alloc();
287+
self.directly_report_jemalloc_alloc();
190288
let snaps_view = into_sst_views(snaps);
191289
unsafe {
192290
let region = ProtoMsgBaseBuff::new(region);
@@ -203,13 +301,17 @@ impl EngineStoreServerHelper {
203301

204302
pub fn apply_pre_handled_snapshot(&self, snap: RawCppPtr) {
205303
debug_assert!(self.fn_apply_pre_handled_snapshot.is_some());
304+
self.maybe_jemalloc_register_alloc();
305+
self.directly_report_jemalloc_alloc();
206306
unsafe {
207307
(self.fn_apply_pre_handled_snapshot.into_inner())(self.inner, snap.ptr, snap.type_)
208308
}
209309
}
210310

211311
pub fn abort_pre_handle_snapshot(&self, region_id: u64, peer_id: u64) {
212312
debug_assert!(self.fn_abort_pre_handle_snapshot.is_some());
313+
self.maybe_jemalloc_register_alloc();
314+
self.directly_report_jemalloc_alloc();
213315
unsafe { (self.fn_abort_pre_handle_snapshot.into_inner())(self.inner, region_id, peer_id) }
214316
}
215317

@@ -277,6 +379,8 @@ impl EngineStoreServerHelper {
277379
) -> EngineStoreApplyRes {
278380
debug_assert!(self.fn_handle_ingest_sst.is_some());
279381

382+
self.maybe_jemalloc_register_alloc();
383+
self.directly_report_jemalloc_alloc();
280384
let snaps_view = into_sst_views(snaps);
281385
unsafe {
282386
(self.fn_handle_ingest_sst.into_inner())(
@@ -290,6 +394,8 @@ impl EngineStoreServerHelper {
290394
pub fn handle_destroy(&self, region_id: u64) {
291395
debug_assert!(self.fn_handle_destroy.is_some());
292396

397+
self.maybe_jemalloc_register_alloc();
398+
self.directly_report_jemalloc_alloc();
293399
unsafe {
294400
(self.fn_handle_destroy.into_inner())(self.inner, region_id);
295401
}

0 commit comments

Comments
 (0)