Skip to content

Commit 00e3324

Browse files
committed
initial
Signed-off-by: CalvinNeo <[email protected]>
1 parent cafa770 commit 00e3324

File tree

14 files changed

+127
-4
lines changed

14 files changed

+127
-4
lines changed

proxy_components/engine_store_ffi/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ openssl-vendored = [
3232
"openssl/vendored"
3333
]
3434

35+
external-jemalloc = ["proxy_ffi/external-jemalloc"]
36+
3537
[dependencies]
3638
batch-system = { workspace = true, default-features = false }
3739
bitflags = "1.0.1"

proxy_components/engine_store_ffi/src/core/forward_raft/region.rs

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
1313
}
1414

1515
pub fn on_region_changed(&self, ob_region: &Region, e: RegionChangeEvent, r: StateRole) {
16+
self.engine_store_server_helper.maybe_jemalloc_register_alloc();
1617
let region_id = ob_region.get_id();
1718
if e == RegionChangeEvent::Destroy {
1819
info!(
@@ -104,6 +105,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
104105
}
105106

106107
pub fn on_role_change(&self, ob_region: &Region, r: &RoleChange) {
108+
self.engine_store_server_helper.maybe_jemalloc_register_alloc();
107109
let region_id = ob_region.get_id();
108110
let is_replicated = !r.initialized;
109111
let is_fap_enabled = if let Some(b) = self.engine.proxy_ext.config_set.as_ref() {

proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs

-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ pub fn maybe_collect_states(
9898
}
9999

100100
pub fn collect_all_states(cluster_ext: &ClusterExt, region_id: u64) -> HashMap<u64, States> {
101-
maybe_collect_states(cluster_ext, region_id, None);
102101
let prev_state = maybe_collect_states(cluster_ext, region_id, None);
103102
assert_eq!(
104103
prev_state.len(),

proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs

+58
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use std::{cell::RefCell, pin::Pin, sync::atomic::Ordering};
4+
use std::sync::Mutex;
45

56
use engine_store_ffi::TiFlashEngine;
67

@@ -18,6 +19,27 @@ use super::{
1819
};
1920
use crate::mock_cluster;
2021

22+
#[derive(Clone)]
23+
pub struct ThreadInfoJealloc {
24+
pub allocated_ptr: u64,
25+
pub deallocated_ptr: u64,
26+
}
27+
28+
impl ThreadInfoJealloc {
29+
pub fn allocated(&self) -> u64 {
30+
unsafe {
31+
*(self.allocated_ptr as *const u64)
32+
}
33+
}
34+
pub fn deallocated(&self) -> u64 {
35+
unsafe {
36+
*(self.deallocated_ptr as *const u64)
37+
}
38+
}
39+
pub fn remaining(&self) -> i64 {
40+
self.allocated() as i64 - self.deallocated() as i64
41+
}
42+
}
2143
pub struct EngineStoreServer {
2244
pub id: u64,
2345
// TODO engines maybe changed into TabletRegistry?
@@ -28,6 +50,7 @@ pub struct EngineStoreServer {
2850
pub page_storage: MockPageStorage,
2951
// (region_id, peer_id) -> MockRegion
3052
pub tmp_fap_regions: HashMap<RegionId, Box<MockRegion>>,
53+
pub thread_info_map: Mutex<HashMap<String, ThreadInfoJealloc>>,
3154
}
3255

3356
impl EngineStoreServer {
@@ -40,6 +63,7 @@ impl EngineStoreServer {
4063
region_states: RefCell::new(Default::default()),
4164
page_storage: Default::default(),
4265
tmp_fap_regions: Default::default(),
66+
thread_info_map: Default::default(),
4367
}
4468
}
4569

@@ -369,6 +393,7 @@ pub fn gen_engine_store_server_helper(
369393
fn_query_fap_snapshot_state: Some(ffi_query_fap_snapshot_state),
370394
fn_kvstore_region_exists: Some(ffi_kvstore_region_exists),
371395
fn_clear_fap_snapshot: Some(ffi_clear_fap_snapshot),
396+
fn_report_thread_allocate_info: Some(ffi_report_thread_allocate_info),
372397
ps: PageStorageInterfaces {
373398
fn_create_write_batch: Some(ffi_mockps_create_write_batch),
374399
fn_wb_put_page: Some(ffi_mockps_wb_put_page),
@@ -609,3 +634,36 @@ unsafe extern "C" fn ffi_get_lock_by_key(
609634
},
610635
}
611636
}
637+
638+
unsafe extern "C" fn ffi_report_thread_allocate_info(
639+
arg1: *mut interfaces_ffi::EngineStoreServerWrap,
640+
name: interfaces_ffi::BaseBuffView,
641+
t: u64,
642+
value: u64,
643+
) {
644+
let store = into_engine_store_server_wrap(arg1);
645+
let tn = std::str::from_utf8(name.to_slice()).unwrap().to_string();
646+
match (*store.engine_store_server).thread_info_map.lock().expect("poisoned").entry(tn) {
647+
std::collections::hash_map::Entry::Occupied(mut o) => {
648+
if t == 0 {
649+
o.get_mut().allocated_ptr = value;
650+
} else {
651+
o.get_mut().deallocated_ptr = value;
652+
}
653+
}
654+
std::collections::hash_map::Entry::Vacant(v) => {
655+
if t == 0 {
656+
v.insert(ThreadInfoJealloc {
657+
allocated_ptr: value,
658+
deallocated_ptr: 0
659+
});
660+
} else {
661+
v.insert(ThreadInfoJealloc {
662+
allocated_ptr: 0,
663+
deallocated_ptr: value
664+
});
665+
}
666+
}
667+
668+
}
669+
}

proxy_components/proxy_ffi/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ openssl-vendored = [
2828
"openssl/vendored"
2929
]
3030

31+
external-jemalloc = []
32+
3133
[dependencies]
3234
encryption = { workspace = true, default-features = false }
3335
openssl = { workspace = true } # TODO only for feature

proxy_components/proxy_ffi/src/engine_store_helper_impls.rs

+41-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.
22
use std::pin::Pin;
3+
use std::cell::RefCell;
34

45
use kvproto::{kvrpcpb, metapb, raft_cmdpb};
56

@@ -34,6 +35,10 @@ pub fn gen_engine_store_server_helper(
3435
unsafe { &(*(engine_store_server_helper as *const EngineStoreServerHelper)) }
3536
}
3637

38+
thread_local! {
39+
pub static JEMALLOC_REGISTERED: RefCell<bool> = RefCell::new(false);
40+
}
41+
3742
/// # Safety
3843
/// The lifetime of `engine_store_server_helper` is definitely longer than
3944
/// `ENGINE_STORE_SERVER_HELPER_PTR`.
@@ -49,6 +54,31 @@ pub fn set_server_info_resp(res: &kvproto::diagnosticspb::ServerInfoResponse, pt
4954
}
5055

5156
impl EngineStoreServerHelper {
57+
pub fn maybe_jemalloc_register_alloc(&self) {
58+
JEMALLOC_REGISTERED.with(|b| {
59+
if !*b.borrow() {
60+
unsafe {
61+
let ptr_alloc: u64 = crate::jemalloc_utils::get_allocatep_on_thread_start();
62+
let ptr_dealloc: u64 = crate::jemalloc_utils::get_deallocatep_on_thread_start();
63+
let thread_name = std::thread::current().name().unwrap_or("<proxy-unknown>").to_string();
64+
(self.fn_report_thread_allocate_info.into_inner())(
65+
self.inner,
66+
BaseBuffView::from(thread_name.as_bytes()),
67+
0,
68+
ptr_alloc
69+
);
70+
(self.fn_report_thread_allocate_info.into_inner())(
71+
self.inner,
72+
BaseBuffView::from(thread_name.as_bytes()),
73+
1,
74+
ptr_dealloc
75+
);
76+
}
77+
*(b.borrow_mut()) = true;
78+
}
79+
});
80+
}
81+
5282
pub fn gc_raw_cpp_ptr(&self, ptr: *mut ::std::os::raw::c_void, tp: RawCppPtrType) {
5383
debug_assert!(self.fn_gc_raw_cpp_ptr.is_some());
5484
unsafe {
@@ -82,6 +112,7 @@ impl EngineStoreServerHelper {
82112

83113
pub fn handle_compute_store_stats(&self) -> StoreStats {
84114
debug_assert!(self.fn_handle_compute_store_stats.is_some());
115+
self.maybe_jemalloc_register_alloc();
85116
unsafe { (self.fn_handle_compute_store_stats.into_inner())(self.inner) }
86117
}
87118

@@ -91,16 +122,19 @@ impl EngineStoreServerHelper {
91122
header: RaftCmdHeader,
92123
) -> EngineStoreApplyRes {
93124
debug_assert!(self.fn_handle_write_raft_cmd.is_some());
125+
self.maybe_jemalloc_register_alloc();
94126
unsafe { (self.fn_handle_write_raft_cmd.into_inner())(self.inner, cmds.gen_view(), header) }
95127
}
96128

97129
pub fn handle_get_engine_store_server_status(&self) -> EngineStoreServerStatus {
98130
debug_assert!(self.fn_handle_get_engine_store_server_status.is_some());
131+
self.maybe_jemalloc_register_alloc();
99132
unsafe { (self.fn_handle_get_engine_store_server_status.into_inner())(self.inner) }
100133
}
101134

102135
pub fn handle_set_proxy(&self, proxy: *const RaftStoreProxyFFIHelper) {
103136
debug_assert!(self.fn_atomic_update_proxy.is_some());
137+
self.maybe_jemalloc_register_alloc();
104138
unsafe { (self.fn_atomic_update_proxy.into_inner())(self.inner, proxy as *mut _) }
105139
}
106140

@@ -129,7 +163,7 @@ impl EngineStoreServerHelper {
129163
header: RaftCmdHeader,
130164
) -> EngineStoreApplyRes {
131165
debug_assert!(self.fn_handle_admin_raft_cmd.is_some());
132-
166+
self.maybe_jemalloc_register_alloc();
133167
unsafe {
134168
let req = ProtoMsgBaseBuff::new(req);
135169
let resp = ProtoMsgBaseBuff::new(resp);
@@ -158,6 +192,7 @@ impl EngineStoreServerHelper {
158192
term: u64,
159193
) -> bool {
160194
debug_assert!(self.fn_try_flush_data.is_some());
195+
self.maybe_jemalloc_register_alloc();
161196
// TODO(proactive flush)
162197
unsafe {
163198
(self.fn_try_flush_data.into_inner())(
@@ -187,6 +222,7 @@ impl EngineStoreServerHelper {
187222
) -> RawCppPtr {
188223
debug_assert!(self.fn_pre_handle_snapshot.is_some());
189224

225+
self.maybe_jemalloc_register_alloc();
190226
let snaps_view = into_sst_views(snaps);
191227
unsafe {
192228
let region = ProtoMsgBaseBuff::new(region);
@@ -203,13 +239,15 @@ impl EngineStoreServerHelper {
203239

204240
pub fn apply_pre_handled_snapshot(&self, snap: RawCppPtr) {
205241
debug_assert!(self.fn_apply_pre_handled_snapshot.is_some());
242+
self.maybe_jemalloc_register_alloc();
206243
unsafe {
207244
(self.fn_apply_pre_handled_snapshot.into_inner())(self.inner, snap.ptr, snap.type_)
208245
}
209246
}
210247

211248
pub fn abort_pre_handle_snapshot(&self, region_id: u64, peer_id: u64) {
212249
debug_assert!(self.fn_abort_pre_handle_snapshot.is_some());
250+
self.maybe_jemalloc_register_alloc();
213251
unsafe { (self.fn_abort_pre_handle_snapshot.into_inner())(self.inner, region_id, peer_id) }
214252
}
215253

@@ -277,6 +315,7 @@ impl EngineStoreServerHelper {
277315
) -> EngineStoreApplyRes {
278316
debug_assert!(self.fn_handle_ingest_sst.is_some());
279317

318+
self.maybe_jemalloc_register_alloc();
280319
let snaps_view = into_sst_views(snaps);
281320
unsafe {
282321
(self.fn_handle_ingest_sst.into_inner())(
@@ -290,6 +329,7 @@ impl EngineStoreServerHelper {
290329
pub fn handle_destroy(&self, region_id: u64) {
291330
debug_assert!(self.fn_handle_destroy.is_some());
292331

332+
self.maybe_jemalloc_register_alloc();
293333
unsafe {
294334
(self.fn_handle_destroy.into_inner())(self.inner, region_id);
295335
}

proxy_components/proxy_ffi/src/interfaces.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,14 @@ pub mod root {
750750
region_id: u64,
751751
) -> bool,
752752
>,
753+
pub fn_report_thread_allocate_info: ::std::option::Option<
754+
unsafe extern "C" fn(
755+
arg1: *mut root::DB::EngineStoreServerWrap,
756+
name: root::DB::BaseBuffView,
757+
type_: u64,
758+
value: u64,
759+
),
760+
>,
753761
}
754762
extern "C" {
755763
pub fn ffi_get_server_info_from_proxy(
@@ -758,7 +766,7 @@ pub mod root {
758766
arg3: root::DB::RawVoidPtr,
759767
) -> u32;
760768
}
761-
pub const RAFT_STORE_PROXY_VERSION: u64 = 8589640407431546086;
769+
pub const RAFT_STORE_PROXY_VERSION: u64 = 10285342393410618515;
762770
pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639;
763771
}
764772
}

proxy_components/proxy_ffi/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod read_index_helper;
2323
// FFI releated with reading from SST/RocksDB files.
2424
pub mod snapshot_reader_impls;
2525
pub mod utils;
26+
pub mod jemalloc_utils;
2627

2728
pub use self::{
2829
basic_ffi_impls::*, domain_impls::*, encryption_impls::*, engine_store_helper_impls::*,

proxy_components/proxy_server/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ backup-stream-debug = ["backup-stream/backup-stream-debug"]
3535
testexport = ["engine_tiflash/testexport", "engine_store_ffi/testexport", "tikv/testexport"]
3636
pprof-fp = ["tikv/pprof-fp"]
3737
openssl-vendored = ["tikv/openssl-vendored", "openssl/vendored"]
38+
external-jemalloc = ["engine_store_ffi/external-jemalloc"]
3839

3940
[dependencies]
4041
api_version = { workspace = true }

proxy_components/proxy_server/src/run.rs

+5
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,11 @@ impl<ER: RaftEngine, F: KvFormat> TiKvServer<ER, F> {
618618
Arc::clone(&security_mgr),
619619
);
620620

621+
#[cfg(feature = "external-jemalloc")]
622+
info!("linked with external jemalloc");
623+
#[cfg(not(feature = "external-jemalloc"))]
624+
info!("linked without external jemalloc");
625+
621626
// Initialize and check config
622627
info!("using proxy config"; "config" => ?proxy_config);
623628
crate::config::address_proxy_config(&mut config, &proxy_config);

proxy_tests/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ sse = ["tikv/sse"]
4646
portable = ["tikv/portable"]
4747
openssl-vendored = ["tikv/openssl-vendored"]
4848
enable-pagestorage = []
49+
external-jemalloc = ["proxy_server/external-jemalloc", "engine_store_ffi/external-jemalloc"]
4950

5051
[dependencies]
5152
api_version = { workspace = true }

proxy_tests/proxy/shared/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ mod server_cluster_test;
1414
mod snapshot;
1515
mod store;
1616
mod write;
17+
mod jemalloc;
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
#pragma once
22
#include <cstdint>
3-
namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 8589640407431546086ull; }
3+
namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 10285342393410618515ull; }

raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h

+3
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,9 @@ struct EngineStoreServerHelper {
373373
uint64_t, uint64_t);
374374
void (*fn_clear_fap_snapshot)(EngineStoreServerWrap *, uint64_t region_id);
375375
bool (*fn_kvstore_region_exists)(EngineStoreServerWrap *, uint64_t region_id);
376+
void (*fn_report_thread_allocate_info)(EngineStoreServerWrap *,
377+
BaseBuffView name, uint64_t type,
378+
uint64_t value);
376379
};
377380

378381
#ifdef __cplusplus

0 commit comments

Comments
 (0)